scheduleCenter.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import datetime
  2. import json
  3. import time
  4. import hashlib
  5. import os
  6. import win32api
  7. import hmac
  8. from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor
  9. import requests
  10. from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
  11. from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor
  12. from apscheduler.schedulers.background import BlockingScheduler
  13. from unitls.settings import softVersion
  14. oldVersion,nowVersion = softVersion()
  15. current_path = os.getcwd()
  16. if nowVersion:
  17. path = os.path.join(current_path,nowVersion)
  18. else:
  19. path = None
  20. #oldVersion="ServerWithJWT.exe"
  21. #nowVersion="ServerWithJWT.exe"
  22. #path=os.path.join("D:\\flightinfoV3_sever\dist\ServerWithJWT.exe")
  23. print(path)
  24. # 配置执行器
  25. executors = {
  26. 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程
  27. 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程
  28. }
  29. # 配置任务默认参数
  30. job_defaults = {
  31. 'coalesce': False, # 任务积压时是否合并执行
  32. 'max_instances': 3, # 最大并发实例数
  33. 'misfire_grace_time': 60 # 任务误点容忍时间(秒)
  34. }
  35. # 创建调度器并配置执行器
  36. scheduler = BlockingScheduler(
  37. executors=executors,
  38. job_defaults=job_defaults,
  39. timezone='Asia/Shanghai'
  40. )
  41. SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥
  42. RestartSts=False
  43. def restart():
  44. global RestartSts
  45. if not RestartSts:
  46. RestartSts=True
  47. print(f"【{datetime.datetime.now()}】尝试关闭服务器")
  48. try:
  49. if oldVersion:
  50. os.system("taskkill /F /IM %s" % oldVersion)
  51. print(f"【{datetime.datetime.now()}】旧版关闭服务器成功")
  52. else:
  53. print(f"【{datetime.datetime.now()}】旧版服务器未设置,暂不关闭")
  54. except:
  55. print(f"【{datetime.datetime.now()}】尝试关闭旧版服务器失败")
  56. try:
  57. if nowVersion:
  58. os.system("taskkill /F /IM %s" % nowVersion)
  59. print(f"【{datetime.datetime.now()}】新版关闭服务器成功")
  60. else:
  61. print(f"【{datetime.datetime.now()}】新版服务器未设置,暂不关闭")
  62. except:
  63. print(f"【{datetime.datetime.now()}】尝试关闭新版服务器失败")
  64. print(f"【{datetime.datetime.now()}】尝试重启服务器")
  65. try:
  66. if path:
  67. win32api.ShellExecute(0 , 'open' , path , '' , '' , 1)
  68. print(f"【{datetime.datetime.now()}】新版启动成功")
  69. else:
  70. print(f"【{datetime.datetime.now()}】请先设置服务器启动版本")
  71. except:
  72. print(f"【{datetime.datetime.now()}】新版服务器启动失败")
  73. time.sleep(30)
  74. RestartSts=False
  75. print(f"【{datetime.datetime.now()}】重启环节结束")
  76. def generate_signature() :
  77. # 1. 获取当前Unix时间戳(秒级)
  78. timestamp = str(int(time.time()))
  79. data = { "timestamp" : timestamp }
  80. # 2. 对数据进行JSON序列化,确保排序一致
  81. # sort_keys=True 保证键的顺序一致,否则签名会不匹配
  82. data_str = json.dumps(data , sort_keys = True).encode('utf-8')
  83. # 3. 组合数据:数据字符串 + | + 时间戳
  84. signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8')
  85. # 4. 使用HMAC-SHA256算法生成签名
  86. signature = hmac.new(
  87. SIGNATURE_KEY ,
  88. signature_base ,
  89. hashlib.sha256
  90. ).hexdigest()
  91. return timestamp , signature
  92. def my_listener(event):
  93. if event.exception:
  94. print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}")
  95. else:
  96. print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}")
  97. url = 'http://localhost:5001'
  98. headers = {'Content-Type': 'application/json'}
  99. num = 1
  100. # 添加任务到不同执行器
  101. # 定义A和B请求的处理函数(单独抽离)
  102. def handle_flight_data_request(req_type):
  103. try:
  104. timestamp , signature = generate_signature()
  105. header = { 'Content-Type' : 'application/json' ,
  106. 'X-Timestamp' : timestamp ,
  107. 'X-Signature' : signature ,
  108. }
  109. response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=header)
  110. response_json = response.json()
  111. print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}")
  112. except Exception as e:
  113. print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}")
  114. def getflightdata():
  115. try:
  116. #print('getflightdata time:{}'.format(datetime.datetime.now()))
  117. global num
  118. """IO密集型任务,使用默认线程池"""
  119. timestamp , signature = generate_signature()
  120. header = {'Content-Type': 'application/json',
  121. 'X-Timestamp': timestamp,
  122. 'X-Signature': signature,
  123. }
  124. response = requests.get(url+'/checkLogin', headers=header)
  125. print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}")
  126. if response.json()['code'] == "0":
  127. print(f"【{datetime.datetime.now()}】AMRO未登录")
  128. else:
  129. with ConcurrentThreadPoolExecutor(max_workers = 2) as executor :
  130. if num > 2 :
  131. # 异步执行A请求
  132. executor.submit(handle_flight_data_request, 'A')
  133. print(f"【{datetime.datetime.now()}】A刷新请求;")
  134. num = 1
  135. else:
  136. # 异步执行B请求
  137. executor.submit(handle_flight_data_request, 'B')
  138. print(f"【{datetime.datetime.now()}】B刷新请求;")
  139. num += 1
  140. '''
  141. if num >2:
  142. getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers)
  143. # result = getFlightDataA.json()
  144. print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}")
  145. num = 1
  146. else:
  147. getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers)
  148. print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}")
  149. num += 1
  150. '''
  151. except:
  152. pass
  153. def getTable():
  154. try:
  155. timestamp , signature = generate_signature()
  156. header = {'Content-Type': 'application/json',
  157. 'X-Timestamp': timestamp,
  158. 'X-Signature': signature,
  159. }
  160. response = requests.get(url+'/table', headers=header)
  161. print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1] if response.json()['data'] else response.json()['msg'] }")
  162. #print(result)
  163. except:
  164. pass
  165. def postgresqlCheck():
  166. try:
  167. timestamp , signature = generate_signature()
  168. header = {'Content-Type': 'application/json',
  169. 'X-Timestamp': timestamp,
  170. 'X-Signature': signature,
  171. }
  172. #print('postgresqlCheck time:{}'.format(datetime.datetime.now()))
  173. response = requests.get(url+'/postgresql', headers=header)
  174. print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}")
  175. #print(result)
  176. except:
  177. pass
  178. def backupdata():
  179. try:
  180. timestamp , signature = generate_signature()
  181. header = {'Content-Type': 'application/json',
  182. 'X-Timestamp': timestamp,
  183. 'X-Signature': signature,
  184. }
  185. #print('backupdata time:{}'.format(datetime.datetime.now()))
  186. response = requests.get(url+'/bakupdata', headers=header)
  187. print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}")
  188. except:
  189. pass
  190. def health_check():
  191. try:
  192. timestamp , signature = generate_signature()
  193. header = {'Content-Type': 'application/json',
  194. 'X-Timestamp': timestamp,
  195. 'X-Signature': signature,
  196. }
  197. #print('backupdata time:{}'.format(datetime.datetime.now()))
  198. response = requests.get(url+'/static/health', headers=header)
  199. print(f"【{datetime.datetime.now()}】监控请求:{response.json()}")
  200. if response.status_code != 200:
  201. restart()
  202. else:
  203. print(f"【{datetime.datetime.now()}】服务正常")
  204. except Exception as e:
  205. print(f"【{datetime.datetime.now()}】服务异常:{e}")
  206. restart()
  207. def cpu_bound_job():
  208. """CPU密集型任务,使用进程池"""
  209. pass
  210. scheduler.add_job(getflightdata, 'interval', seconds=30)
  211. scheduler.add_job(getTable, 'interval', seconds=60)
  212. scheduler.add_job(postgresqlCheck, 'interval', seconds=60)
  213. scheduler.add_job(backupdata, 'interval', minutes=5)
  214. scheduler.add_job(health_check, 'interval', seconds=10)
  215. # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool')
  216. print('开启定时任务')
  217. scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  218. scheduler.start()