原文: Python定时任务8种实现方式
使用 threading.Timer 实现
threading.Timer类允许你安排一个函数在指定时间后执行。
它通过创建一个新的线程来实现 ,使得定时任务不会阻塞程序的其他部分。
1
2
3
4
5
6
7
8
|
import threading
def hello():
print("Hello, this is a timed message!")
# 创建定时器 ,5秒后执行hello函数
t = threading.Timer(5.0, hello)
t.start() # 开始计时
|
每5秒执行一次hello函数。
定时任务管理与取消策略
threading.Timer对象提供了cancel()方法来取消尚未启动的任务:
1
2
3
4
5
6
7
8
9
10
11
|
t = threading.Timer(10.0, hello) # 改为10秒后执行
t.start()
# 假设在5秒后决定取消这个定时任务
import time
time.sleep(5)
if t.is_alive(): # 检查定时器是否还在运行
t.cancel()
print("The timer has been cancelled.")
else:
print("The timer has already executed.")
|
Schedule库自动化
安装
使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
|
import schedule
import time
def job():
print("Hello, Schedule!")
# 每隔10秒执行一次job函数
schedule.every(10).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
|
特定时间执行,及任务异常处理
1
2
3
4
5
6
7
8
|
def safe_job():
try:
# 假设这是可能会抛出异常的代码
...
except Exception as e:
print(f"An error occurred: {e}")
schedule.every().day.at("10:30").do(saft_job)
|
指定每天10:30执行,执行任务内使用try-except捕获异常,保护定时任务不会因为异常崩溃。
APScheduler框架
APScheduler(Advanced Python Scheduler)是一个功能全面且高度可定制的定时任务库,适用于Python 3.5及更高版本。它支持cron风格的时间表达式、日期时间以及固定间隔等多种触发方式,同时具备持久化任务、错误处理、任务存储插件等高级特性,非常适合构建复杂且健壮的定时任务系统。
安装:
1
|
pip install apscheduler
|
使用示例:
1
2
3
4
5
6
7
8
9
|
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def my_job():
print("Job executed at " + str(datetime.datetime.now()))
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5)
scheduler.start()
|
每5秒执行一次my_job函数。
不同类型的触发器介绍
APScheduler提供了多种触发器来适应不同场景的需求:
- IntervalTrigger:用于在固定时间间隔后重复执行任务。例如,每隔5秒执行一次。
1
2
3
4
|
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5)
|
- CronTrigger:类似于Linux的cron,支持复杂的周期性计划,如每天凌晨1点执行。
1
2
3
4
5
6
|
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.blocking import BlockingScheduler
trigger = CronTrigger(hour=1, minute=0) # 每天凌晨1点执行
scheduler = BlockingScheduler()
scheduler.add_job(my_job, trigger)
|
- DateTrigger:在特定的日期和时间只执行一次。
1
2
3
4
5
6
|
from apscheduler.triggers.date import DateTrigger
from apscheduler.schedulers.blocking import BlockingScheduler
trigger = DateTrigger(run_date=datetime.datetime(2023, 4, 1)) # 在2023年4月1日执行
scheduler = BlockingScheduler()
scheduler.add_job(my_job, trigger)
|
这些触发器使得APScheduler能够灵活适应各种定时任务需求,从简单的定时提醒到复杂的业务逻辑定时处理,都能轻松应对。通过合理选择触发器类型和配置 ,可以构建出既高效又可靠的定时任务解决方案。
基于Unix Cron的Python实现
Linux Crontab语法
Cron是Unix/Linux系统中用于定时执行任务的强大工具。Crontab文件包含了定时任务的列表,每个任务都遵循一个特定的语法来定义何时执行。Cron语法由六个字段组成,分别代表分钟、小时、一天中的哪一天、月份、一周中的哪一天和命令。字段之间用空格分隔,其格式如下:
1
2
3
4
5
6
7
8
|
* * * * * command_to_execute
- - - - -
| | | | |
| | | | +--- 星期几 (0 - 6) ,0 和 7 都代表周日
| | | +----- 月份 (1 - 12)
| | +------- 一个月中的哪一天 (1 - 31)
| +--------- 小时 (0 - 23)
+----------- 分钟 (0 - 59)
|
特殊符号含义:
Python脚本与系统Cron结合
在crontab -e中编辑配置定时任务,定时执行python脚本。
1
|
0 6 * * * /path/to/your/my_script.py
|
Tornado异步定时任务
Tornado是一个Python网络库,以其异步非阻塞IO和高并发能力著称,特别适合构建高性能的Web服务和实时应用。其核心在于IOLoop ,这是一个事件循环 ,负责监听和处理I/O事件,如网络连接、超时、信号等,以及调度回调函数。
Tornado的IOLoop提供了call_later、call_at等方法,可以方便地安排一次性或周期性的定时任务。这些方法不阻塞主事件循环,因此非常适合在异步环境中执行定时操作。
使用IOLoop.call_later在5秒后执行一个函数:
1
2
3
4
5
6
7
8
|
import tornado.ioloop
import time
def print_time():
print("Current time:", time.ctime())
tornado.ioloop.IOLoop.current().call_later(5, print_time) # 5秒后执行print_time
tornado.ioloop.IOLoop.current().start()
|
周期性定时任务示例
若要实现周期性执行,可以结合call_later和函数自身调用来重新安排下一次执行。下面是一个每两秒打印时间的示例:
1
2
3
4
5
6
7
8
9
|
import tornado.ioloop
import time
def periodic_callback():
print("Current time:", time.ctime())
tornado.ioloop.IOLoop.current().call_later(2, periodic_callback) # 每2秒调用一次
tornado.ioloop.IOLoop.current().call_later(2, periodic_callback) # 首次调度
tornado.ioloop.IOLoop.current().start()
|
通过上述示例 ,可以看出Tornado的IOLoop为异步定时任务提供了一种高效、灵活的解决方案,尤其适合那些需要在高性能服务器端应用中集成定时处理逻辑的场景。
Celery分布式任务队列
Celery是一款广泛使用的分布式任务队列系统,适用于处理大量消息和后台任务,特别是那些需要异步处理或定时执行的任务。它支持多种消息中间件(如RabbitMQ、Redis)作为任务代理,确保了任务的可靠传递与高可用性。
安装
1
|
pip install celery[redis] # 以Redis为消息中间件安装
|
定时任务调度与监控
Celery提供了丰富的定时任务功能,主要通过beat服务和crontab风格的调度表达式来实现。
定时任务配置
在项目目录下创建一个celery.py文件,配置定时任务:
1
2
3
4
5
6
|
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def send_email_report():
print("Sending email report...")
|
然后,在另一个文件中配置定时任务调度(例如在tasks.py中):
1
2
3
4
5
6
7
8
9
|
from celery.schedules import crontab
from .celery import app
app.conf.beat_schedule = {
'send-email-every-day': {
'task': 'tasks.send_email_report',
'schedule': crontab(hour=0, minute=0), # 每天凌晨执行
},
}
|
运行Celery服务
启动Celery worker处理任务:
1
|
celery -A tasks worker --loglevel=info
|
同时,启动beat服务以驱动定时任务:
1
|
celery beat -A tasks --loglevel=info
|
监控
Celery提供了实时监控工具Flower,可以查看任务状态、进度和结果。安装Flower并运行:
1
2
|
pip install flower
flower -A tasks --port=5555
|
访问http://localhost:5555即可查看监控界面。
通过上述步骤,你可以有效地在Python应用中集成Celery实现分布式定时任务处理 ,不仅提高了任务处理的效率和可靠性,还便于监控和管理。
使用asyncio实现异步定时
在Python中,asyncio是一个用于编写并发代码的库,利用协程(coroutine)、事件循环(event loop)和任务(task)等概念 ,实现了异步IO操作。协程是一种轻量级的子例程,可以在等待IO操作(如网络请求、磁盘读写)期间挂起,从而使其他任务得以执行,提高了程序的效率和响应速度。
asyncio实现定时执行案例
要使用asyncio来实现定时任务,通常涉及asyncio.sleep()函数配合事件循环。下面是一个简单的定时打印消息的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import asyncio
async def print_every_three_seconds():
while True:
print("This message is printed every three seconds.")
await asyncio.sleep(3)
async def main():
# 创建一个任务并启动
task = asyncio.create_task(print_every_three_seconds())
# 保持主程序运行,否则事件循环一旦执行完就会结束
await asyncio.sleep(10) # 示例中让主程序等待10秒后自然结束
# 运行主函数
asyncio.run(main())
|
这段代码定义了一个异步函数print_every_three_seconds,它会无限循环每隔3秒打印一次消息。main函数中通过asyncio.create_task创建了这个任务并在事件循环中启动。为了使主程序维持运行一段时间以观察输出效果,我们使用了await asyncio.sleep(10)。实际上,生产环境中可能需要根据具体需求调整或采用更复杂的机制来控制主循环的生命周期。
请注意,实际应用中可能需要考虑任务的取消、异常处理等高级特性,以确保程序的健壮性。
基于Django的CronJobs管理后台
Django-Cron是专为Django框架设计的库,旨在简化定时任务的管理和调度 ,特别是将其集成到Django的管理后台。它允许开发者通过Django的Admin界面来创建、编辑和删除定时任务 ,而无需直接操作服务器的cron作业。这样 ,定时任务的管理变得更加直观和便捷,特别适合Django项目的维护团队协作。
安装Django-Cron
1
|
pip install django-cron
|
使用示例:
1
2
3
4
5
6
7
8
9
10
|
from django_cron import CronJobBase, Schedule
class MyCronJob(CronJobBase):
RUN_EVERY_MINS = 1 # 每分钟运行一次
schedule = Schedule(run_every_mins=RUN_EVERY_MINS)
code = 'my_cron.my_cron_job' # 任务唯一标识
def do(self):
print("Cron Job Running")
# 在这里执行你的任务逻辑
|
通常 ,Django-Cron通过管理命令python manage.py runcrons来执行任务。