import datetime import json import time import hashlib import hmac from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor import requests from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor from apscheduler.schedulers.background import BlockingScheduler # 配置执行器 executors = { 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程 } # 配置任务默认参数 job_defaults = { 'coalesce': False, # 任务积压时是否合并执行 'max_instances': 3, # 最大并发实例数 'misfire_grace_time': 60 # 任务误点容忍时间(秒) } # 创建调度器并配置执行器 scheduler = BlockingScheduler( executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai' ) SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥 def generate_signature() : """ 生成请求签名 :param data: 请求的JSON数据 :return: (时间戳, 签名) """ # 1. 获取当前Unix时间戳(秒级) timestamp = str(int(time.time())) data = { "timestamp" : timestamp } # 2. 对数据进行JSON序列化,确保排序一致 # sort_keys=True 保证键的顺序一致,否则签名会不匹配 data_str = json.dumps(data , sort_keys = True).encode('utf-8') # 3. 组合数据:数据字符串 + | + 时间戳 signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8') # 4. 使用HMAC-SHA256算法生成签名 signature = hmac.new( SIGNATURE_KEY , signature_base , hashlib.sha256 ).hexdigest() return timestamp , signature def my_listener(event): if event.exception: print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}") else: print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}") url = 'http://localhost:5001' headers = {'Content-Type': 'application/json'} num = 1 # 添加任务到不同执行器 # 定义A和B请求的处理函数(单独抽离) def handle_flight_data_request(req_type): try: timestamp , signature = generate_signature() header = { 'Content-Type' : 'application/json' , 'X-Timestamp' : timestamp , 'X-Signature' : signature , } response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=header) response_json = response.json() print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}") except Exception as e: print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}") def getflightdata(): #print('getflightdata time:{}'.format(datetime.datetime.now())) global num """IO密集型任务,使用默认线程池""" timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.get(url+'/checkLogin', headers=header) print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}") if response.json()['code'] == "0": print(f"【{datetime.datetime.now()}】AMRO未登录") else: with ConcurrentThreadPoolExecutor(max_workers = 2) as executor : if num > 2 : # 异步执行A请求 executor.submit(handle_flight_data_request, 'A') print(f"【{datetime.datetime.now()}】A刷新请求;") num = 1 else: # 异步执行B请求 executor.submit(handle_flight_data_request, 'B') print(f"【{datetime.datetime.now()}】B刷新请求;") num += 1 ''' if num >2: getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers) # result = getFlightDataA.json() print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}") num = 1 else: getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers) print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}") num += 1 ''' def getTable(): timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.get(url+'/table', headers=header) print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1] if response.json()['data'] else response.json()['msg'] }") #print(result) def postgresqlCheck(): timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } #print('postgresqlCheck time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/postgresql', headers=header) print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}") #print(result) def backupdata(): timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } #print('backupdata time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/bakupdata', headers=header) print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}") def cpu_bound_job(): """CPU密集型任务,使用进程池""" pass scheduler.add_job(getflightdata, 'interval', seconds=30) scheduler.add_job(getTable, 'interval', seconds=60) scheduler.add_job(postgresqlCheck, 'interval', seconds=60) scheduler.add_job(backupdata, 'interval', minutes=5) # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool') print('开启定时任务') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler.start()