123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import datetime
- import json
- import time
- import hashlib
- import os
- import win32api
- import hmac
- from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor
- import requests
- from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
- from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor
- from apscheduler.schedulers.background import BlockingScheduler
- from unitls.settings import softVersion
- oldVersion,nowVersion = softVersion()
- current_path = os.getcwd()
- if nowVersion:
- path = os.path.join(current_path,nowVersion)
- else:
- path = None
- #oldVersion="ServerWithJWT.exe"
- #nowVersion="ServerWithJWT.exe"
- #path=os.path.join("D:\\flightinfoV3_sever\dist\ServerWithJWT.exe")
- print(path)
- # 配置执行器
- executors = {
- 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程
- 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程
- }
- # 配置任务默认参数
- job_defaults = {
- 'coalesce': False, # 任务积压时是否合并执行
- 'max_instances': 3, # 最大并发实例数
- 'misfire_grace_time': 60 # 任务误点容忍时间(秒)
- }
- # 创建调度器并配置执行器
- scheduler = BlockingScheduler(
- executors=executors,
- job_defaults=job_defaults,
- timezone='Asia/Shanghai'
- )
- SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥
- RestartSts=False
- def restart():
- global RestartSts
- if not RestartSts:
- RestartSts=True
- print(f"【{datetime.datetime.now()}】尝试关闭服务器")
- try:
- if oldVersion:
- os.system("taskkill /F /IM %s" % oldVersion)
- print(f"【{datetime.datetime.now()}】旧版关闭服务器成功")
- else:
- print(f"【{datetime.datetime.now()}】旧版服务器未设置,暂不关闭")
- except:
- print(f"【{datetime.datetime.now()}】尝试关闭旧版服务器失败")
- try:
- if nowVersion:
- os.system("taskkill /F /IM %s" % nowVersion)
- print(f"【{datetime.datetime.now()}】新版关闭服务器成功")
- else:
- print(f"【{datetime.datetime.now()}】新版服务器未设置,暂不关闭")
- except:
- print(f"【{datetime.datetime.now()}】尝试关闭新版服务器失败")
- print(f"【{datetime.datetime.now()}】尝试重启服务器")
- try:
- if path:
- win32api.ShellExecute(0 , 'open' , path , '' , '' , 1)
- print(f"【{datetime.datetime.now()}】新版启动成功")
- else:
- print(f"【{datetime.datetime.now()}】请先设置服务器启动版本")
- except:
- print(f"【{datetime.datetime.now()}】新版服务器启动失败")
- time.sleep(30)
- RestartSts=False
- print(f"【{datetime.datetime.now()}】重启环节结束")
- def generate_signature() :
- # 1. 获取当前Unix时间戳(秒级)
- timestamp = str(int(time.time()))
- data = { "timestamp" : timestamp }
- # 2. 对数据进行JSON序列化,确保排序一致
- # sort_keys=True 保证键的顺序一致,否则签名会不匹配
- data_str = json.dumps(data , sort_keys = True).encode('utf-8')
- # 3. 组合数据:数据字符串 + | + 时间戳
- signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8')
- # 4. 使用HMAC-SHA256算法生成签名
- signature = hmac.new(
- SIGNATURE_KEY ,
- signature_base ,
- hashlib.sha256
- ).hexdigest()
- return timestamp , signature
- def my_listener(event):
- if event.exception:
- print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}")
- else:
- print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}")
- url = 'http://localhost:5001'
- headers = {'Content-Type': 'application/json'}
- num = 1
- # 添加任务到不同执行器
- # 定义A和B请求的处理函数(单独抽离)
- def handle_flight_data_request(req_type):
- try:
- timestamp , signature = generate_signature()
- header = { 'Content-Type' : 'application/json' ,
- 'X-Timestamp' : timestamp ,
- 'X-Signature' : signature ,
- }
- response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=header)
- response_json = response.json()
- print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}")
- except Exception as e:
- print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}")
- def getflightdata():
- try:
- #print('getflightdata time:{}'.format(datetime.datetime.now()))
- global num
- """IO密集型任务,使用默认线程池"""
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- response = requests.get(url+'/checkLogin', headers=header)
- print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}")
- if response.json()['code'] == "0":
- print(f"【{datetime.datetime.now()}】AMRO未登录")
- else:
- with ConcurrentThreadPoolExecutor(max_workers = 2) as executor :
- if num > 2 :
- # 异步执行A请求
- executor.submit(handle_flight_data_request, 'A')
- print(f"【{datetime.datetime.now()}】A刷新请求;")
- num = 1
- else:
- # 异步执行B请求
- executor.submit(handle_flight_data_request, 'B')
- print(f"【{datetime.datetime.now()}】B刷新请求;")
- num += 1
- '''
- if num >2:
- getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers)
- # result = getFlightDataA.json()
- print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}")
- num = 1
- else:
- getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers)
- print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}")
- num += 1
- '''
- except:
- pass
- def getTable():
- try:
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- response = requests.get(url+'/table', headers=header)
- print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1] if response.json()['data'] else response.json()['msg'] }")
- #print(result)
- except:
- pass
- def postgresqlCheck():
- try:
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- #print('postgresqlCheck time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/postgresql', headers=header)
- print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}")
- #print(result)
- except:
- pass
- def backupdata():
- try:
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- #print('backupdata time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/bakupdata', headers=header)
- print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}")
- except:
- pass
- def health_check():
- try:
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- #print('backupdata time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/static/health', headers=header)
- print(f"【{datetime.datetime.now()}】监控请求:{response.json()}")
- if response.status_code != 200:
- restart()
- else:
- print(f"【{datetime.datetime.now()}】服务正常")
- except Exception as e:
- print(f"【{datetime.datetime.now()}】服务异常:{e}")
- restart()
- def cpu_bound_job():
- """CPU密集型任务,使用进程池"""
- pass
- scheduler.add_job(getflightdata, 'interval', seconds=30)
- scheduler.add_job(getTable, 'interval', seconds=60)
- scheduler.add_job(postgresqlCheck, 'interval', seconds=60)
- scheduler.add_job(backupdata, 'interval', minutes=5)
- scheduler.add_job(health_check, 'interval', seconds=10)
- # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool')
- print('开启定时任务')
- scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
- scheduler.start()
|