import datetime import json from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor import requests from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor from apscheduler.schedulers.background import BlockingScheduler # 配置执行器 executors = { 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程 } # 配置任务默认参数 job_defaults = { 'coalesce': False, # 任务积压时是否合并执行 'max_instances': 3, # 最大并发实例数 'misfire_grace_time': 60 # 任务误点容忍时间(秒) } # 创建调度器并配置执行器 scheduler = BlockingScheduler( executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai' ) def my_listener(event): if event.exception: print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}") else: print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}") url = 'http://localhost:5001' headers = {'Content-Type': 'application/json'} num = 1 # 添加任务到不同执行器 # 定义A和B请求的处理函数(单独抽离) def handle_flight_data_request(req_type): try: response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=headers) response_json = response.json() print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}") except Exception as e: print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}") def getflightdata(): #print('getflightdata time:{}'.format(datetime.datetime.now())) global num """IO密集型任务,使用默认线程池""" response = requests.get(url+'/checkLogin', headers=headers) print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}") if response.json()['code'] == "0": print(f"【{datetime.datetime.now()}】AMRO未登录") else: with ConcurrentThreadPoolExecutor(max_workers = 2) as executor : if num > 2 : # 异步执行A请求 executor.submit(handle_flight_data_request, 'A') print(f"【{datetime.datetime.now()}】A刷新请求;") num = 1 else: # 异步执行B请求 executor.submit(handle_flight_data_request, 'B') print(f"【{datetime.datetime.now()}】B刷新请求;") num += 1 ''' if num >2: getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers) # result = getFlightDataA.json() print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}") num = 1 else: getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers) print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}") num += 1 ''' def getTable(): response = requests.get(url+'/table', headers=headers) print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1]}") #print(result) def postgresqlCheck(): #print('postgresqlCheck time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/postgresql', headers=headers) print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}") #print(result) def backupdata(): #print('backupdata time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/bakupdata', headers=headers) print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}") def cpu_bound_job(): """CPU密集型任务,使用进程池""" pass scheduler.add_job(getflightdata, 'interval', seconds=30) scheduler.add_job(getTable, 'interval', seconds=60) scheduler.add_job(postgresqlCheck, 'interval', seconds=60) scheduler.add_job(backupdata, 'interval', minutes=5) # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool') print('开启定时任务') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler.start()