Pthon任务调度
Python 中常用的定时任务库包括:
- APScheduler - 功能强大,支持多种调度方式,易于使用。
- Celery - 适用于分布式任务队列,可以与多种消息队列(如RabbitMQ、Redis)配合使用。
- Schedule - 简单易用,适合简单的定时任务。
- Django-Cron - 专门为 Django 框架设计的定时任务库。
- Kronos - 另一个轻量级的定时任务库,适用于简单的周期性任务。
这些库各有特点,可以根据具体需求选择合适的库。
好的,下面是每个库的一个简单示例:
1. APScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("定时任务执行中...")
try:
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=5) # 每5秒执行一次
# 使用 Cron 表达式添加任务
# 这个表达式表示每分钟的第 0 秒执行一次
scheduler.add_job(job, 'cron', minute='*/1', second='0')
scheduler.start()
except (KeyboardInterrupt, SystemExit):
# 捕获中断信号(如 Ctrl+C),并停止调度器
print("接收到中断信号,正在停止调度器...")
scheduler.shutdown()
print("调度器已停止。")
解释
- 创建调度器:使用 BlockingScheduler 创建一个调度器实例。
- 添加任务:使用 add_job 方法添加一个任务,并指定其触发条件为每分钟的第 0 秒。
- 启动调度器:调用 scheduler.start() 启动调度器。
- 捕获中断信号:使用 try-except 块捕获中断信号(如 KeyboardInterrupt 和 SystemExit)。
- 停止调度器:在捕获到中断信号后,调用 scheduler.shutdown() 方法停止调度器。
注意事项:
- BlockingScheduler 会阻塞主线程,因此你需要在 try-except 块中捕获中断信号来停止调度器。
- 如果你使用的是 BackgroundScheduler 或其他非阻塞调度器,可以在主线程中调用 scheduler.shutdown() 来停止调度器。 例如,使用 BackgroundScheduler 的示例:
from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print("定时任务执行中...")
# 创建调度器
scheduler = BackgroundScheduler()
# 使用 Cron 表达式添加任务
# 这个表达式表示每分钟的第 0 秒执行一次
scheduler.add_job(job, 'cron', minute='*/1', second='0')
# 启动调度器
print("调度器已启动...")
scheduler.start()
try:
# 保持主线程运行
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
# 捕获中断信号(如 Ctrl+C),并停止调度器
print("接收到中断信号,正在停止调度器...")
scheduler.shutdown()
print("调度器已停止。")
在这个示例中,BackgroundScheduler 不会阻塞主线程,因此你需要使用 while True 循环来保持主线程运行,并在捕获到中断信号时停止调度器。
2. Celery
首先需要安装 celery
和 redis
(或其他消息队列),然后配置并运行一个简单的 Celery 任务。
安装依赖:
pip install celery redis
创建 tasks.py
文件:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
# app.conf.beat_schedule = {
# 'run-every-minute': {
# 'task': 'tasks.periodic_task',
# 'schedule': crontab(minute='*/1'), # 每分钟执行一次
# },
# }
@app.task
def periodic_task():
print("Celery 定时任务执行中...")
启动 Celery worker:
celery -A tasks worker --loglevel=info
启动 Celery beat:
celery -A tasks beat --loglevel=info
在 tasks.py
中添加周期性任务:
from celery.schedules import crontab
app.conf.beat_schedule = {
'run-every-5-seconds': {
'task': 'tasks.periodic_task',
'schedule': 5.0, # 每5秒执行一次
},
}
3. Schedule
import schedule
import time
def job():
print("定时任务执行中...")
schedule.every(5).seconds.do(job) # 每5秒执行一次
while True:
schedule.run_pending()
time.sleep(1)
4. Django-Cron
首先需要安装 django-cron
:
pip install django-cron
在 settings.py
中添加 django_cron
到 INSTALLED_APPS
:
INSTALLED_APPS = [
...
'django_cron',
]
CRON_CLASSES = [
'myapp.crons.MyCronJob',
]
创建 myapp/crons.py
文件:
from django_cron import CronJobBase, Schedule
class MyCronJob(CronJobBase):
RUN_EVERY_MINS = 1 # 每分钟执行一次
schedule = Schedule(run_every_mins=RUN_EVERY_MINS)
code = 'myapp.my_cron_job' # 一个唯一标识符
def do(self):
print("Django-Cron 定时任务执行中...")
5. Kronos
首先需要安装 kronos
:
pip install kronos
在 settings.py
中添加 kronos
到 INSTALLED_APPS
:
INSTALLED_APPS = [
...
'kronos',
]
创建 cron.py
文件:
from kronos import register, every
@register(every('5.seconds'))
def my_cron_job():
print("Kronos 定时任务执行中...")
这些示例展示了如何使用不同的库来实现简单的定时任务。你可以根据具体需求选择合适的库和配置。