schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。
如果想执行多个任务,也可以添加多个task。

首先安装schedule库:

pip install schedule

1、按时间间隔执行定时任务

示例代码1:

import schedulefrom datetime import datetimedef task():    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts)def task2():    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + '666!')def func():    # 清空任务    schedule.clear()    # 创建一个按3秒间隔执行任务    schedule.every(3).seconds.do(task)    # 创建一个按2秒间隔执行任务    schedule.every(2).seconds.do(task2)    while True:        schedule.run_pending()func()

运行结果:

示例代码2:

import scheduleimport timedef job(name):    print("her name is : ", name)name = "张三"schedule.every(10).minutes.do(job, name)schedule.every().hour.do(job, name)schedule.every().day.at("10:30").do(job, name)schedule.every(5).to(10).days.do(job, name)schedule.every().monday.do(job, name)schedule.every().wednesday.at("13:15").do(job, name)while True:    schedule.run_pending()    time.sleep(1)

参数解释:

  • 每隔十分钟执行一次任务
  • 每隔一小时执行一次任务
  • 每天的10:30执行一次任务
  • 每隔5到10天执行一次任务
  • 每周一的这个时候执行一次任务
  • 每周三13:15执行一次任务
  • run_pending:运行所有可以运行的任务

注意:schedule方法是串行的,也就是说,如果各个任务之间时间不冲突,那是没问题的;如果时间有冲突的话,会串行的执行命令。

2、装饰器:通过 @repeat() 装饰静态方法

示例代码:

from datetime import datetimefrom schedule import every, repeat, run_pending@repeat(every(3).seconds)def task():    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + '-333!')@repeat(every(5).seconds)def task2():    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + "-555555!")while True:    run_pending()

运行结果:

3、传递参数

示例代码:

from datetime import datetimeimport scheduledef task(s):    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + s)def task2(s):    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + s)schedule.every(3).seconds.do(task, s='-333')schedule.every(5).seconds.do(task, s='-555')while True:    schedule.run_pending()

运行结果:

4、使用装饰器传递参数

示例代码:

from datetime import datetimefrom schedule import every, repeat, run_pending@repeat(every(3).seconds, '-333')def task(s):    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + s)@repeat(every(5).seconds, '-555')def task2(s):    now = datetime.now()    ts = now.strftime("%Y-%m-%d %H:%M:%S")    print(ts + s)while True:    run_pending()

运行结果:

5、取消定时任务

示例代码:

import schedulei = 0def some_task():    global i    i += 1    print(i)    if i == 5:        schedule.cancel_job(job)        print('cancel job')        exit(0)job = schedule.every().second.do(some_task)while True:    schedule.run_pending()

运行结果:

6、在指定时间执行一次任务

示例代码:

import timeimport scheduledef job_that_executes_once():    print('Hello')    return schedule.CancelJobschedule.every().minute.at(':30').do(job_that_executes_once)while True:    schedule.run_pending()    time.sleep(1)

运行结果:

7、根据标签检索任务

示例代码:

# 检索所有任务:schedule.get_jobs()import scheduledef greet(name):    print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')print(friends)

运行结果:

8、根据标签取消任务

示例代码:

# 取消所有任务:schedule.clear()import scheduledef greet(name):    print('Hello {}'.format(name))    if name == 'Cancel':        schedule.clear('second-tasks')        print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True:    schedule.run_pending()

运行结果:

9、运行任务到某时间

示例代码:

import schedulefrom datetime import datetime, timedelta, timedef job():    print('working...')schedule.every().second.until('23:59').do(job)  # 今天23:59停止schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止while True:    schedule.run_pending()

运行结果:

10、马上运行所有任务(主要用于测试)

示例代码:

import scheduledef job():    print('working...')def job1():    print('Hello...')schedule.every().monday.at('12:40').do(job)schedule.every().tuesday.at('16:40').do(job1)schedule.run_all()schedule.run_all(delay_seconds=3)  # 任务间延迟3秒

运行结果:

11、并行运行:使用 Python 内置队列实现

示例代码:

import threadingimport timeimport scheduledef job1():    print("I'm running on thread %s" % threading.current_thread())def job2():    print("I'm running on thread %s" % threading.current_thread())def job3():    print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):    job_thread = threading.Thread(target=job_func)    job_thread.start()schedule.every(10).seconds.do(run_threaded, job1)schedule.every(10).seconds.do(run_threaded, job2)schedule.every(10).seconds.do(run_threaded, job3)while True:    schedule.run_pending()    time.sleep(1)

运行结果: