123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import datetime
- import json
- from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor
- import requests
- from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
- from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor
- from apscheduler.schedulers.background import BlockingScheduler
- # 配置执行器
- executors = {
- 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程
- 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程
- }
- # 配置任务默认参数
- job_defaults = {
- 'coalesce': False, # 任务积压时是否合并执行
- 'max_instances': 3, # 最大并发实例数
- 'misfire_grace_time': 60 # 任务误点容忍时间(秒)
- }
- # 创建调度器并配置执行器
- scheduler = BlockingScheduler(
- executors=executors,
- job_defaults=job_defaults,
- timezone='Asia/Shanghai'
- )
- def my_listener(event):
- if event.exception:
- print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}")
- else:
- print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}")
- url = 'http://localhost:5001'
- headers = {'Content-Type': 'application/json'}
- num = 1
- # 添加任务到不同执行器
- # 定义A和B请求的处理函数(单独抽离)
- def handle_flight_data_request(req_type):
- try:
- response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=headers)
- response_json = response.json()
- print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}")
- except Exception as e:
- print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}")
- def getflightdata():
- #print('getflightdata time:{}'.format(datetime.datetime.now()))
- global num
- """IO密集型任务,使用默认线程池"""
- response = requests.get(url+'/checkLogin', headers=headers)
- print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}")
- if response.json()['code'] == "0":
- print(f"【{datetime.datetime.now()}】AMRO未登录")
- else:
- with ConcurrentThreadPoolExecutor(max_workers = 2) as executor :
- if num > 2 :
- # 异步执行A请求
- executor.submit(handle_flight_data_request, 'A')
- print(f"【{datetime.datetime.now()}】A刷新请求;")
- num = 1
- else:
- # 异步执行B请求
- executor.submit(handle_flight_data_request, 'B')
- print(f"【{datetime.datetime.now()}】B刷新请求;")
- num += 1
- '''
- if num >2:
- getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers)
- # result = getFlightDataA.json()
- print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}")
- num = 1
- else:
- getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers)
- print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}")
- num += 1
- '''
- def getTable():
- response = requests.get(url+'/table', headers=headers)
- print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1]}")
- #print(result)
- def postgresqlCheck():
- #print('postgresqlCheck time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/postgresql', headers=headers)
- print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}")
- #print(result)
- def backupdata():
- #print('backupdata time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/bakupdata', headers=headers)
- print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}")
- def cpu_bound_job():
- """CPU密集型任务,使用进程池"""
- pass
- scheduler.add_job(getflightdata, 'interval', seconds=30)
- scheduler.add_job(getTable, 'interval', seconds=60)
- scheduler.add_job(postgresqlCheck, 'interval', seconds=60)
- scheduler.add_job(backupdata, 'interval', minutes=5)
- # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool')
- print('开启定时任务')
- scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
- scheduler.start()
|