<python模块> 分布式任务队列——Celery

时间:Sept. 12, 2018 分类:

目录:

celery

介绍

基于python开发的分布式异步消息队列,可以实现任务的异步处理或者定时任务。

celery中文文档

优点

  • 高可用:当执行任务失败或者执行过程中发生了中断,celery会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可以处理百万的任务
  • 灵活:celery的各个组件都可以被扩展以及自定制

工作流程

  1. 由app发送异步任务到队列(redis或rabbitmq)
  2. celery worker到队列中取任务并进行执行
  3. app从队列中获取任务执行结果

基本使用

下边使用redis,如果使用rabbitmq,可以参考docs.celeryproject.org

安装celery

yum install -y redis
pip2.7 install redis
pip2.7 install celery

配置和代码

tasks.py

from celery import Celery

app = Celery('TASK',
             broker='redis://localhost',
             backend='redis://localhost')

@app.task
def add(x,y):
    print("running...",x,y)
    return x+y

对于redis,完整配置redis://:password@hostname:port/db_number,更多相关配置参考docs.celeryproject.org

启动celery worker

注意这边-A的参数需要和tasks.py的文件名一直,否则会报错ImportError: No module named ****

[ec2-user@qbj3-phpuser-10 ~]$ celery -A tasks worker --loglevel=info

 -------------- celery@qbj3-phpuser-10 v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-2.6.32-696.13.2.el6.x86_64-x86_64-with-centos-6.9-Final 2018-04-02 16:59:58
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fb353399510
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2018-04-02 16:59:58,804: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-04-02 16:59:58,809: INFO/MainProcess] mingle: searching for neighbors
[2018-04-02 16:59:59,822: INFO/MainProcess] mingle: all alone
[2018-04-02 16:59:59,831: INFO/MainProcess] celery@qbj3-phpuser-10 ready.

可以看到加载的tasks包含tasks.add

通过python client调用celery

要进入tasks.py所在目录,通过python client

>>> from tasks import add
>>> add.delay(4, 4)
<AsyncResult: 62504702-efa3-4a14-b7ba-2288c7da8524>

可以看到

[2018-04-02 17:02:12,348: INFO/MainProcess] Received task: tasks.add[62504702-efa3-4a14-b7ba-2288c7da8524]  
[2018-04-02 17:02:12,349: WARNING/ForkPoolWorker-2] ('running...', 4, 4)
[2018-04-02 17:02:12,354: INFO/ForkPoolWorker-2] Task tasks.add[62504702-efa3-4a14-b7ba-2288c7da8524] succeeded in 0.00469902399891s: 8

获取一下执行结果

>>> result = add.delay(4, 4)
>>> print result
bff8f572-9287-43fa-94b7-74c1890c8b4b
>>> result.get(timeout=1)
8
[2018-04-02 17:02:18,609: INFO/MainProcess] Received task: tasks.add[bff8f572-9287-43fa-94b7-74c1890c8b4b]  
[2018-04-02 17:02:18,610: WARNING/ForkPoolWorker-2] ('running...', 4, 4)
[2018-04-02 17:02:18,611: INFO/ForkPoolWorker-2] Task tasks.add[bff8f572-9287-43fa-94b7-74c1890c8b4b] succeeded in 0.000636545000816s: 8

在redis的存储方式

可以在redis看到keys

可以用keys *查看一下

查看key中的内容

127.0.0.1:6379> get celery-task-meta-bff8f572-9287-43fa-94b7-74c1890c8b4b
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"bff8f572-9287-43fa-94b7-74c1890c8b4b\", \"children\": []}"

是一个json格式的字典

  • status任务的状态
  • task_id任务的id
  • children任务的子任务
  • traceback任务的错误
  • result任务的返回结果

把celery配置成一个应用

[ec2-user@qbj3-phpuser-10 ~]$ tree proj/
proj/
├── celery.py
├── __init__.py
└── tasks.py

0 directories, 3 files

celery相当于配置文件

  • proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()
  • proj/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动worker服务

[ec2-user@qbj3-phpuser-10 ~]$ celery -A proj worker -l info

 -------------- celery@qbj3-phpuser-10 v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-2.6.32-696.13.2.el6.x86_64-x86_64-with-centos-6.9-Final 2018-04-03 18:02:50
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f992450e050
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2018-04-03 18:02:50,240: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-04-03 18:02:50,246: INFO/MainProcess] mingle: searching for neighbors
[2018-04-03 18:02:51,259: INFO/MainProcess] mingle: all alone
[2018-04-03 18:02:51,269: INFO/MainProcess] celery@qbj3-phpuser-10 ready.
[2018-04-03 18:03:04,992: INFO/MainProcess] Received task: proj.tasks.mul[a792a5b9-1edf-469a-a140-dce368e410db]  
[2018-04-03 18:03:04,998: INFO/ForkPoolWorker-1] Task proj.tasks.mul[a792a5b9-1edf-469a-a140-dce368e410db] succeeded in 0.00444237199554s: 16

python client调用

[ec2-user@qbj3-phpuser-10 ~]$ python
Python 2.7.14 (default, Oct 27 2017, 10:09:21) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.tasks import add,mul 
>>> result = mul.delay(4, 4) 
>>> result.get()
16

后台启动

  • 启动一个worker
[ec2-user@qbj3-phpuser-10 ~]$ celery multi start w1 -A proj -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
    > w1@qbj3-phpuser-10: OK
  • 重启worker
[ec2-user@qbj3-phpuser-10 ~]$ celery  multi restart w1 -A proj -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
    > w1@qbj3-phpuser-10: TERM -> 5305
> Waiting for 1 node -> 5305.....
    > w1@qbj3-phpuser-10: OK
> Restarting node w1@qbj3-phpuser-10: OK
> Waiting for 1 node -> None...
  • 停止一个worker
[ec2-user@qbj3-phpuser-10 ~]$ celery multi stop w1 -A proj -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
    > w1@qbj3-phpuser-10: TERM -> 5377

stop是一个异步的命令,不会等待worker关闭,这样可以确保在退出前会保证所有tasks执行完成。

    -
[ec2-user@qbj3-phpuser-10 ~]$ celery multi start w1 -A proj -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
    > w1@qbj3-phpuser-10: OK
[ec2-user@qbj3-phpuser-10 ~]$ celery multi stopwait w1 -A proj -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
    > w1@qbj3-phpuser-10: TERM -> 6517
> Waiting for 1 node -> 6517.....
    > w1@qbj3-phpuser-10: OK
> w1@qbj3-phpuser-10: DOWN
> Waiting for 1 node -> None...

对于启动多个worker

[ec2-user@qbj3-phpuser-10 ~]$ celery multi start w1 -A proj -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
    > w1@qbj3-phpuser-10: OK
[ec2-user@qbj3-phpuser-10 ~]$ celery multi start w2 -A proj -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
    > w2@qbj3-phpuser-10: OK
[ec2-user@qbj3-phpuser-10 ~]$ celery multi start w3 -A proj -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
    > w3@qbj3-phpuser-10: OK
[ec2-user@qbj3-phpuser-10 ~]$ ps -ef | grep celety
ec2-user 19416 27371  0 19:54 pts/0    00:00:00 grep celety
[ec2-user@qbj3-phpuser-10 ~]$ ps -ef | grep celery
ec2-user 19284     1  7 19:54 ?        00:00:01 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@qbj3-phpuser-10
ec2-user 19294     1  8 19:54 ?        00:00:01 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@qbj3-phpuser-10
ec2-user 19298 19284  0 19:54 ?        00:00:00 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@qbj3-phpuser-10
ec2-user 19299 19284  0 19:54 ?        00:00:00 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@qbj3-phpuser-10
ec2-user 19327 19294  0 19:54 ?        00:00:00 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@qbj3-phpuser-10
ec2-user 19328 19294  0 19:54 ?        00:00:00 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@qbj3-phpuser-10
ec2-user 19330     1  9 19:54 ?        00:00:01 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@qbj3-phpuser-10
ec2-user 19372 19330  0 19:54 ?        00:00:00 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@qbj3-phpuser-10
ec2-user 19373 19330  0 19:54 ?        00:00:00 /usr/local/bin/python2.7 -m celery worker -l info -A proj --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@qbj3-phpuser-10
ec2-user 19443 27371  0 19:54 pts/0    00:00:00 grep celery

关闭多个worker

[ec2-user@qbj3-phpuser-10 ~]$ celery multi stopwait w1 w2 w3
celery multi v4.1.0 (latentcall)
> Stopping nodes...
    > w3@qbj3-phpuser-10: TERM -> 19330
    > w1@qbj3-phpuser-10: TERM -> 19284
    > w2@qbj3-phpuser-10: TERM -> 19294
> Waiting for 3 nodes -> 19330, 19284, 19294.......
    > w3@qbj3-phpuser-10: OK
> w3@qbj3-phpuser-10: DOWN
> Waiting for 3 nodes -> None, None, None....
    > w1@qbj3-phpuser-10: OK
> w1@qbj3-phpuser-10: DOWN
> Waiting for 2 nodes -> None, None....
    > w2@qbj3-phpuser-10: OK
> w2@qbj3-phpuser-10: DOWN
> Waiting for 1 node -> None...

注意的点

对于byte类型的返回结果

需要decode()一下

对于新增和修改内容

新增可以去读取到,但是修改不会,需要重启

更多参数

  • 返回报错
result.get(propagate=False)
  • traceback 返回错误的行数等
result.traceback

Celery 定时任务

celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat,需要单独启动beat服务

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)
  • name可以用于区分定时任务
  • test是下边定义的函数
  • on_after_configure.connect启动后自动执行该方法,生成定时任务

启动worker

[ec2-user@qbj3-phpuser-10 ~]$ celery -A proj2 worker -l info

 -------------- celery@qbj3-phpuser-10 v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-2.6.32-696.13.2.el6.x86_64-x86_64-with-centos-6.9-Final 2018-04-03 20:05:59
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj2:0x7f29e598b110
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . proj2.periodic_task.test

[2018-04-03 20:05:59,535: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-04-03 20:05:59,542: INFO/MainProcess] mingle: searching for neighbors
[2018-04-03 20:06:00,555: INFO/MainProcess] mingle: all alone
[2018-04-03 20:06:00,563: INFO/MainProcess] celery@qbj3-phpuser-10 ready.
[2018-04-03 20:08:11,221: INFO/MainProcess] Received task: proj2.periodic_task.test[4746b121-39ad-4aad-93e6-16596e49d71a]  
[2018-04-03 20:08:11,222: WARNING/ForkPoolWorker-2] hello
[2018-04-03 20:08:11,227: INFO/ForkPoolWorker-2] Task proj2.periodic_task.test[4746b121-39ad-4aad-93e6-16596e49d71a] succeeded in 0.00472354600788s: None
[2018-04-03 20:08:21,205: INFO/MainProcess] Received task: proj2.periodic_task.test[65eee7e4-bc8d-4826-9c79-cb8a22ec46b6]   expires:[2018-04-03 12:08:31.202731+00:00]
[2018-04-03 20:08:21,206: INFO/MainProcess] Received task: proj2.periodic_task.test[1ad6f06e-d7e9-4c9d-be7c-faf054f147d1]  
[2018-04-03 20:08:21,206: WARNING/ForkPoolWorker-2] world
[2018-04-03 20:08:21,207: WARNING/ForkPoolWorker-1] hello
[2018-04-03 20:08:21,207: INFO/ForkPoolWorker-2] Task proj2.periodic_task.test[65eee7e4-bc8d-4826-9c79-cb8a22ec46b6] succeeded in 0.00109724298818s: None
[2018-04-03 20:08:21,211: INFO/ForkPoolWorker-1] Task proj2.periodic_task.test[1ad6f06e-d7e9-4c9d-be7c-faf054f147d1] succeeded in 0.00420732099155s: None
[2018-04-03 20:08:31,206: INFO/MainProcess] Received task: proj2.periodic_task.test[6d27cc31-aeb8-49d3-a285-2a2cc2cdf69d]  
[2018-04-03 20:08:31,207: WARNING/ForkPoolWorker-2] hello
[2018-04-03 20:08:31,208: INFO/ForkPoolWorker-2] Task proj2.periodic_task.test[6d27cc31-aeb8-49d3-a285-2a2cc2cdf69d] succeeded in 0.000891784002306s: None
[2018-04-03 20:08:41,206: INFO/MainProcess] Received task: proj2.periodic_task.test[3ea96b10-e198-4290-a491-f30467a24daf]  
[2018-04-03 20:08:41,207: WARNING/ForkPoolWorker-2] hello
[2018-04-03 20:08:41,208: INFO/ForkPoolWorker-2] Task proj2.periodic_task.test[3ea96b10-e198-4290-a491-f30467a24daf] succeeded in 0.000638974001049s: None
[2018-04-03 20:08:41,209: INFO/MainProcess] Received task: proj2.periodic_task.test[51dde51e-728a-4612-9f11-9665c52a298a]   expires:[2018-04-03 12:08:51.206678+00:00]
[2018-04-03 20:08:41,210: WARNING/ForkPoolWorker-2] world
[2018-04-03 20:08:41,210: INFO/ForkPoolWorker-2] Task proj2.periodic_task.test[51dde51e-728a-4612-9f11-9665c52a298a] succeeded in 0.000427602004493s: None

启动beat

[ec2-user@qbj3-phpuser-10 ~]$ celery -A  proj2.periodic_task  beat  -l  debug
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2018-04-03 20:08:01
Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%DEBUG
    . maxinterval -> 5.00 minutes (300s)
[2018-04-03 20:08:01,198: DEBUG/MainProcess] Setting default socket timeout to 30
[2018-04-03 20:08:01,198: INFO/MainProcess] beat: Starting...
[2018-04-03 20:08:01,204: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: proj2.periodic_task.test(u'Happy Mondays!') proj2.periodic_task.test(u'Happy Mondays!') <crontab: 30 7 1 * * (m/h/d/dM/MY)>
<ScheduleEntry: proj2.periodic_task.test(u'world') proj2.periodic_task.test(u'world') <freq: 20.00 seconds>
<ScheduleEntry: add every 10 proj2.periodic_task.test(u'hello') <freq: 10.00 seconds>
[2018-04-03 20:08:01,204: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2018-04-03 20:08:01,205: DEBUG/MainProcess] beat: Waking up in 9.99 seconds.
[2018-04-03 20:08:11,202: DEBUG/MainProcess] beat: Synchronizing schedule...
[2018-04-03 20:08:11,210: INFO/MainProcess] Scheduler: Sending due task add every 10 (proj2.periodic_task.test)
[2018-04-03 20:08:11,220: DEBUG/MainProcess] proj2.periodic_task.test sent. id->4746b121-39ad-4aad-93e6-16596e49d71a
[2018-04-03 20:08:11,220: DEBUG/MainProcess] beat: Waking up in 9.97 seconds.
[2018-04-03 20:08:21,202: INFO/MainProcess] Scheduler: Sending due task proj2.periodic_task.test(u'world') (proj2.periodic_task.test)
[2018-04-03 20:08:21,203: DEBUG/MainProcess] proj2.periodic_task.test sent. id->65eee7e4-bc8d-4826-9c79-cb8a22ec46b6
[2018-04-03 20:08:21,204: INFO/MainProcess] Scheduler: Sending due task add every 10 (proj2.periodic_task.test)
[2018-04-03 20:08:21,204: DEBUG/MainProcess] proj2.periodic_task.test sent. id->1ad6f06e-d7e9-4c9d-be7c-faf054f147d1
[2018-04-03 20:08:21,205: DEBUG/MainProcess] beat: Waking up in 9.99 seconds.
[2018-04-03 20:08:31,204: INFO/MainProcess] Scheduler: Sending due task add every 10 (proj2.periodic_task.test)
[2018-04-03 20:08:31,205: DEBUG/MainProcess] proj2.periodic_task.test sent. id->6d27cc31-aeb8-49d3-a285-2a2cc2cdf69d
[2018-04-03 20:08:31,206: DEBUG/MainProcess] beat: Waking up in 9.99 seconds.
[2018-04-03 20:08:41,204: INFO/MainProcess] Scheduler: Sending due task add every 10 (proj2.periodic_task.test)
[2018-04-03 20:08:41,206: DEBUG/MainProcess] proj2.periodic_task.test sent. id->3ea96b10-e198-4290-a491-f30467a24daf
[2018-04-03 20:08:41,206: INFO/MainProcess] Scheduler: Sending due task proj2.periodic_task.test(u'world') (proj2.periodic_task.test)
[2018-04-03 20:08:41,208: DEBUG/MainProcess] proj2.periodic_task.test sent. id->51dde51e-728a-4612-9f11-9665c52a298a
[2018-04-03 20:08:41,208: DEBUG/MainProcess] beat: Waking up in 9.99 seconds.

celery与的django结合

创建django项目

django-admin startproject CeleryTest
cd CeleryTest/
python manage.py startapp app

默认setting中配置的是sqlite,如果遇到django.core.exceptions.ImproperlyConfigured: Error loading either pysqlite2 or sqlite3 modules (tried in that order): No module named _sqlite3

想要正常创建app还需

yum install -y sqlite-devel
find / -name _sqlite*.so
sudo cp /usr/lib64/python2.6/lib-dynload/_sqlite3.so /usr/local/lib/python2.7/lib-dynload/

配置Celery

CeleryTest/CeleryTest/celery_config.py

放在settings.py同级目录


import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryTest.settings')
app = Celery('CeleryTest')
# 如果以namespace为`CELERY`,则配置文件会读取`CELERY_`
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动加载app中的celery任务
app.autodiscover_tasks()
# 打印debug信息相关
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

CeleryTest/CeleryTest/__init__.py

用于django启动时加载,注意这里from的要和配置的celery_config.py文件名一致

from .celery_config import app as celery_app
__all__ = ['celery_app'] 

CeleryTest/CeleryTest/settings.py

配置celery的BROKER_URLRESULT_BACKEND

CELERY_BROKER_URL = 'redis://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost'

添加tasks任务

每个app中的tasks.py

必须是tasks.py,否则celery无法发现

shared_task是每个app中的views方法中都可以导入这个任务,否则只能在本个app中使用

from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

配置urls.py

from django.conf.urls import url
from django.contrib import admin
from app import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index/$', views.index),
]

添加url对应views.py

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.shortcuts import render, HttpResponse

# Create your views here.

from app import tasks

def index(request):
    res = tasks.add.delay(5, 99)
    print("res:", res)
    return HttpResponse(res.task_id)

启动测试

同步数据库

python manage.py migrate 

启动服务

python manage.py runserver
celery -A CeleryTest worker -l info

前端拿到这个task_id可以通过定时ajax去请求这个id去一个接口获取结果,这样task就变为异步的了

新增接口

urls.py

url(r'task_res/$', views.task_res)

views.py

from celery.result import AsyncResult

def task_res(request):
    task_id = request.GET.get(task_id)
    result = AsyncResult(id=task_id)
    return HttpResponse(result.get())

django celery的定时任务

安装

pip install django_celery_beat

配置

settings.py添加django_celery_beat

INSTALLED_APPS = [
    ...
    "django_celery_beat"
]

同步数据库结构

python manage.py migrate

创建superuser

$ python manage.py createsuperuser
Username (leave blank to use 'root'): why
Email address: why@whysdomain.com
Password: 
Password (again): 
Superuser created successfully.

直接启动服务

python manage.py runserver

配置定时任务

直接请求url:/admin/

  • Crontabs: 计划任务,类似crontab
  • Intervals: 比较简单的定时任务,比如每隔几分钟执行一次
  • Periodic tasks: 要执行的任务
  • Solar events: 依据日出、日落、黄昏、黎明时间

Crontabs

可以设置和Crontab一样的MinuteHourDay of weekDay of monthMonth of year

Intervals

可以选择Every和Period,用于定义每隔多久执行一次

Periodic tasks

注意这里是需要的参数是json格式的参数

启动celery

celery -A CeleryTest beat -l info -S django
celery -A CeleryTest worker -l info

celery beat从django读取任务,然后放到worker执行,就和以前从本地的celerybeat.schedule