Celery 异步框架执行定时任务
Celery是什么?
-
Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块 专注于实时处理的异步任务队列 同时也支持任务调度 可做异步任务,可做定时任务
celery是python的一个异步框架,架构如下:
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成.
启动celery服务(配置好broker和backend),手动或自动添加任务到broker中,broker将任务分发给worker,worker后台异步执行任务,执行完成后将结果存放到backend中
一.celery环境搭建:
1.安装celery:
# 已安装Python环境 pip3 install celery # 验证安装 查看版本 celery --version
2.安装redis(也可以使用rabbit MQ)
使用redis作为backend和broker,需要本机安装redis,并启动redis-server,还需要安装Python第三方模块redis
pip install redis
二.celery使用:
2.1 celery项目目录结构:
(venv) jeanettian@KP-MBP-1179 src % tree celery_task -I __pycache__ celery_task ├── __init__.py ├── celery.py ├── celery_config.py ├── task.py └── test_celery.py
注意:此处定义的celery工程名为celery_task,celeryt_task下必须要有__init__.py文件.
2.1.1 celery.py文件内容:
from __future__ import absolute_import from celery import Celery app = Celery("celery_task.task", # redis作为消息中间件,broker用来存储待执行的任务 broker="redis://127.0.0.1:6379/4", # backend用来存储执行结果回调 backend="redis://127.0.0.1:6379/5", # 注意必须要有include,用来指定任务 include=[celery_task.task] ) app.config_from_object("celery_task.celery_config")
2.1.2 celery_config.py文件内容:
from datetime import timedelta from celery.schedules import crontab task_serializer = json result_serializer = json accept_content = [json] # 时区设置 timezone = Asia/Shanghai # celery默认开启自己的日志 # False表示不关闭 worker_hijack_root_logger = False # 存储结果过期时间,过期后自动删除 # 单位为秒 result_expires = 60 * 60 * 24 # 需要执行任务的配置 beat_schedule = { celery_task.tasks: { task: celery_task.task.add, schedule: timedelta(seconds=20), # 启动每20s执行一次 args: (2, 2) # 此处的args参数会传给定时任务指定函数的形参。 }, # celery_app.tasks: { # task: celery_app.task.task2, # schedule: crontab(minute=*/2), # args: () # } }
2.1.3 task.py文件内容:
from .celery import app @app.task def add(x, y): print("+" * 50) return f"result is: {x+y}" @app.task def mul(x, y): print("-" * 50) return x - y @app.task def sum_x(numbers): print("*" * 50) return sum(numbers)
2.1.4 test_celery.py文件内容:
from .task import add, mul, sum_x add.delay() mul.delay(2, 2) sum_x.delay([2, 2, 2, 2])
2.2 进入celery_task工程所在目录,执行命令启动worker执行定时任务
(此命令为即为多workers并发执行,几个wokers,就开几个bash执行此命令):
celery -A celery_task worker -l INFO
2.3 另开一个bash,进入celery_task工程所在目录,执行命令启动心跳监听分发任务:
celery -A celery_task beat
2.4 查看任务执行结果:
每隔20s执行一次任务.
上一篇:
Python 安装包管理工具 pip