import datetime import json import time import hashlib import os import win32api import hmac from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor import requests from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor from apscheduler.schedulers.background import BlockingScheduler from unitls.settings import softVersion oldVersion,nowVersion = softVersion() current_path = os.getcwd() if nowVersion: path = os.path.join(current_path,nowVersion) else: path = None #oldVersion="ServerWithJWT.exe" #nowVersion="ServerWithJWT.exe" #path=os.path.join("D:\\flightinfoV3_sever\dist\ServerWithJWT.exe") print(path) # 配置执行器 executors = { 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程 } # 配置任务默认参数 job_defaults = { 'coalesce': False, # 任务积压时是否合并执行 'max_instances': 3, # 最大并发实例数 'misfire_grace_time': 60 # 任务误点容忍时间(秒) } # 创建调度器并配置执行器 scheduler = BlockingScheduler( executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai' ) SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥 RestartSts=False def restart(): global RestartSts if not RestartSts: RestartSts=True print(f"【{datetime.datetime.now()}】尝试关闭服务器") try: if oldVersion: os.system("taskkill /F /IM %s" % oldVersion) print(f"【{datetime.datetime.now()}】旧版关闭服务器成功") else: print(f"【{datetime.datetime.now()}】旧版服务器未设置,暂不关闭") except: print(f"【{datetime.datetime.now()}】尝试关闭旧版服务器失败") try: if nowVersion: os.system("taskkill /F /IM %s" % nowVersion) print(f"【{datetime.datetime.now()}】新版关闭服务器成功") else: print(f"【{datetime.datetime.now()}】新版服务器未设置,暂不关闭") except: print(f"【{datetime.datetime.now()}】尝试关闭新版服务器失败") print(f"【{datetime.datetime.now()}】尝试重启服务器") try: if path: win32api.ShellExecute(0 , 'open' , path , '' , '' , 1) print(f"【{datetime.datetime.now()}】新版启动成功") else: print(f"【{datetime.datetime.now()}】请先设置服务器启动版本") except: print(f"【{datetime.datetime.now()}】新版服务器启动失败") time.sleep(30) RestartSts=False print(f"【{datetime.datetime.now()}】重启环节结束") def generate_signature() : # 1. 获取当前Unix时间戳(秒级) timestamp = str(int(time.time())) data = { "timestamp" : timestamp } # 2. 对数据进行JSON序列化,确保排序一致 # sort_keys=True 保证键的顺序一致,否则签名会不匹配 data_str = json.dumps(data , sort_keys = True).encode('utf-8') # 3. 组合数据:数据字符串 + | + 时间戳 signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8') # 4. 使用HMAC-SHA256算法生成签名 signature = hmac.new( SIGNATURE_KEY , signature_base , hashlib.sha256 ).hexdigest() return timestamp , signature def my_listener(event): if event.exception: print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}") else: print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}") url = 'http://localhost:5001' headers = {'Content-Type': 'application/json'} num = 1 # 添加任务到不同执行器 # 定义A和B请求的处理函数(单独抽离) def handle_flight_data_request(req_type): try: timestamp , signature = generate_signature() header = { 'Content-Type' : 'application/json' , 'X-Timestamp' : timestamp , 'X-Signature' : signature , } response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=header) response_json = response.json() print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}") except Exception as e: print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}") def getflightdata(): try: #print('getflightdata time:{}'.format(datetime.datetime.now())) global num """IO密集型任务,使用默认线程池""" timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.get(url+'/checkLogin', headers=header) print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}") if response.json()['code'] == "0": print(f"【{datetime.datetime.now()}】AMRO未登录") else: with ConcurrentThreadPoolExecutor(max_workers = 2) as executor : if num > 2 : # 异步执行A请求 executor.submit(handle_flight_data_request, 'A') print(f"【{datetime.datetime.now()}】A刷新请求;") num = 1 else: # 异步执行B请求 executor.submit(handle_flight_data_request, 'B') print(f"【{datetime.datetime.now()}】B刷新请求;") num += 1 ''' if num >2: getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers) # result = getFlightDataA.json() print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}") num = 1 else: getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers) print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}") num += 1 ''' except: pass def getTable(): try: timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.get(url+'/table', headers=header) print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1] if response.json()['data'] else response.json()['msg'] }") #print(result) except: pass def postgresqlCheck(): try: timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } #print('postgresqlCheck time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/postgresql', headers=header) print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}") #print(result) except: pass def backupdata(): try: timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } #print('backupdata time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/bakupdata', headers=header) print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}") except: pass def health_check(): try: timestamp , signature = generate_signature() header = {'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } #print('backupdata time:{}'.format(datetime.datetime.now())) response = requests.get(url+'/static/health', headers=header) print(f"【{datetime.datetime.now()}】监控请求:{response.json()}") if response.status_code != 200: restart() else: print(f"【{datetime.datetime.now()}】服务正常") except Exception as e: print(f"【{datetime.datetime.now()}】服务异常:{e}") restart() def cpu_bound_job(): """CPU密集型任务,使用进程池""" pass scheduler.add_job(getflightdata, 'interval', seconds=30) scheduler.add_job(getTable, 'interval', seconds=60) scheduler.add_job(postgresqlCheck, 'interval', seconds=60) scheduler.add_job(backupdata, 'interval', minutes=5) scheduler.add_job(health_check, 'interval', seconds=10) # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool') print('开启定时任务') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler.start()