python定时任务模块APScheduler
2023-09-14 08:59:07 时间
一、简单任务
定义一个函数,然后定义一个scheduler类型,添加一个job,然后执行,就可以了
5秒整倍数,就执行这个函数
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好') scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, trigger='cron', second='*/5') scheduler.start()
带参数的
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5') scheduler.start()
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5') scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3) scheduler.start()
二、日志
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print 1/0 print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5') scheduler._logger = logging scheduler.start()
三、删除任务
要求执行一定阶段任务以后,删除某一个循环任务,其他任务照常进行。有如下代码:
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_date(x): scheduler.remove_job('interval_task') print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5', id='cron_task') scheduler.add_job(func=aps_date, args=('一次性任务,删除循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task') scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task') scheduler._logger = logging scheduler.start()
四、停止任务,恢复任务
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_pause(x): scheduler.pause_job('interval_task') print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_resume(x): scheduler.resume_job('interval_task') print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5', id='cron_task') scheduler.add_job(func=aps_pause, args=('一次性任务,停止循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task') scheduler.add_job(func=aps_resume, args=('一次性任务,恢复循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task') scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task') scheduler._logger = logging scheduler.start()
五、捕获错误
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def date_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) print (1/0) def my_listener(event): if event.exception: print ('任务出错了!!!!!!') else: print ('任务照常运行...') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一定性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()
六、定时任务的接口设计
# 定时任务功能 @admin.route('/pause', methods=['POST']) @user_login_req def pausetask(): # 停止 data = request.form.get('task_id') job = scheduler.get_job(str(data)) res = {} if job: if 'pause' in job.__str__(): res.update({'status': 1001, 'msg': '已停止'}) else: scheduler.pause_job(str(data)) res.update({'status': 1000, 'msg': '停止中'}) else: res.update({'status': 1001, 'msg': '未运行'}) return jsonify(res) @admin.route('/resume', methods=['POST']) @user_login_req def resumetask(): # 恢复 data = request.form.get('task_id') job=scheduler.get_job(str(data)) res={} if job: if 'run' in job.__str__(): res.update({'status':1001,'msg':'已恢复'}) else: scheduler.resume_job(str(data)) res.update({'status': 1000, 'msg': '恢复中'}) else: res.update({'status':1001,'msg':'未运行'}) return jsonify(res) @admin.route('/remove_task', methods=['POST']) @user_login_req def remove_task(): # 移除 data = request.form['task_id'] job = scheduler.get_job(str(data)) res = {} if not job: res.update({'status': 1001, 'msg': '已删除'}) else: scheduler.remove_job(str(data)) res.update({'status': 1000, 'msg': '删除中'}) return jsonify(res) @admin.route('/addjob', methods=['POST']) @user_login_req def addtask(): data = request.form.get('task_id') job = scheduler.get_job(str(data)) if job: return jsonify({'status': 1001,'msg':'已开启'}) if data == '1': scheduler.add_job(func=task1, id='1', trigger='cron', day_of_week='0-6', hour=18, minute=19, second=10, replace_existing=True) # trigger='cron' 表示是一个定时任务 else: scheduler.add_job(func=task2, id='2', trigger='interval', seconds=10, replace_existing=True) # trigger='interval' 表示是一个循环任务,每隔多久执行一次 return jsonify({'status': 1000,'msg':'运行中'}) def task1(): print('mession1') print(datetime.datetime.now()) def task2(): print('mession2') print(datetime.datetime.now())
相关文章
- python中的logging模块
- Python 利用pexpect和paramiko模块进行远程服务器的监控
- 大叔经验分享(11)python引入模块报错ImportError: No module named pandas numpy
- Python中random模块生成随机数详解
- python进阶_浅谈面向对象进阶
- Python 进阶(二)模块
- python_json模块和pickle模块
- Python语言学习:Python语言学习之python包/库package的简介(模块的封装/模块路径搜索/模块导入方法/自定义导入模块实现华氏-摄氏温度转换案例应用)、使用方法、管理工具之详细攻略
- Python编程语言学习:python语言中快速查询python自带模块&函数的用法及其属性方法、如何查询某个函数&关键词的用法、输出一个类或者实例化对象的所有属性和方法名之详细攻略
- Python编程语言学习:包导入和模块搜索路径(包路径)简介、使用方法(python系统环境路径的查询与添加)之详细攻略
- Python之tkinter:动态演示调用python库的tkinter带你进入GUI世界(计算器简单功能)
- 已解决2.Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and wi
- 已解决2. Set PROTOCOL_BUPFERS_PYTHON_iMPLEMENTATION=python (but this will use pure-Python parsing and w
- 〖Python 数据库开发实战 - MongoDB篇⑫〗- MongoDB集合的查询操作
- Python爬虫基础:多进程——multiprocessing模块的使用
- Python图像处理丨带你掌握图像几何变换
- Python编程:twine模块打包python项目上传pypi
- python学习——编码
- 写网络爬虫天然就是择Python而用 python 网络爬虫3
- python里使用asyncore模块
- 多版本Python共存时pip给指定版本的python安装package的方法
- Python PyWin32 模块
- 服务器配置 centos一行命令安装python 3.6.8