scheduleCenter.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import datetime
  2. import json
  3. import time
  4. import hashlib
  5. import hmac
  6. from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor
  7. import requests
  8. from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
  9. from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor
  10. from apscheduler.schedulers.background import BlockingScheduler
  11. # 配置执行器
  12. executors = {
  13. 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程
  14. 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程
  15. }
  16. # 配置任务默认参数
  17. job_defaults = {
  18. 'coalesce': False, # 任务积压时是否合并执行
  19. 'max_instances': 3, # 最大并发实例数
  20. 'misfire_grace_time': 60 # 任务误点容忍时间(秒)
  21. }
  22. # 创建调度器并配置执行器
  23. scheduler = BlockingScheduler(
  24. executors=executors,
  25. job_defaults=job_defaults,
  26. timezone='Asia/Shanghai'
  27. )
  28. SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥
  29. def generate_signature() :
  30. """
  31. 生成请求签名
  32. :param data: 请求的JSON数据
  33. :return: (时间戳, 签名)
  34. """
  35. # 1. 获取当前Unix时间戳(秒级)
  36. timestamp = str(int(time.time()))
  37. data = { "timestamp" : timestamp }
  38. # 2. 对数据进行JSON序列化,确保排序一致
  39. # sort_keys=True 保证键的顺序一致,否则签名会不匹配
  40. data_str = json.dumps(data , sort_keys = True).encode('utf-8')
  41. # 3. 组合数据:数据字符串 + | + 时间戳
  42. signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8')
  43. # 4. 使用HMAC-SHA256算法生成签名
  44. signature = hmac.new(
  45. SIGNATURE_KEY ,
  46. signature_base ,
  47. hashlib.sha256
  48. ).hexdigest()
  49. return timestamp , signature
  50. def my_listener(event):
  51. if event.exception:
  52. print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}")
  53. else:
  54. print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}")
  55. url = 'http://localhost:5001'
  56. headers = {'Content-Type': 'application/json'}
  57. num = 1
  58. # 添加任务到不同执行器
  59. # 定义A和B请求的处理函数(单独抽离)
  60. def handle_flight_data_request(req_type):
  61. try:
  62. timestamp , signature = generate_signature()
  63. header = { 'Content-Type' : 'application/json' ,
  64. 'X-Timestamp' : timestamp ,
  65. 'X-Signature' : signature ,
  66. }
  67. response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=header)
  68. response_json = response.json()
  69. print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}")
  70. except Exception as e:
  71. print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}")
  72. def getflightdata():
  73. #print('getflightdata time:{}'.format(datetime.datetime.now()))
  74. global num
  75. """IO密集型任务,使用默认线程池"""
  76. timestamp , signature = generate_signature()
  77. header = {'Content-Type': 'application/json',
  78. 'X-Timestamp': timestamp,
  79. 'X-Signature': signature,
  80. }
  81. response = requests.get(url+'/checkLogin', headers=header)
  82. print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}")
  83. if response.json()['code'] == "0":
  84. print(f"【{datetime.datetime.now()}】AMRO未登录")
  85. else:
  86. with ConcurrentThreadPoolExecutor(max_workers = 2) as executor :
  87. if num > 2 :
  88. # 异步执行A请求
  89. executor.submit(handle_flight_data_request, 'A')
  90. print(f"【{datetime.datetime.now()}】A刷新请求;")
  91. num = 1
  92. else:
  93. # 异步执行B请求
  94. executor.submit(handle_flight_data_request, 'B')
  95. print(f"【{datetime.datetime.now()}】B刷新请求;")
  96. num += 1
  97. '''
  98. if num >2:
  99. getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers)
  100. # result = getFlightDataA.json()
  101. print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}")
  102. num = 1
  103. else:
  104. getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers)
  105. print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}")
  106. num += 1
  107. '''
  108. def getTable():
  109. timestamp , signature = generate_signature()
  110. header = {'Content-Type': 'application/json',
  111. 'X-Timestamp': timestamp,
  112. 'X-Signature': signature,
  113. }
  114. response = requests.get(url+'/table', headers=header)
  115. print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1] if response.json()['data'] else response.json()['msg'] }")
  116. #print(result)
  117. def postgresqlCheck():
  118. timestamp , signature = generate_signature()
  119. header = {'Content-Type': 'application/json',
  120. 'X-Timestamp': timestamp,
  121. 'X-Signature': signature,
  122. }
  123. #print('postgresqlCheck time:{}'.format(datetime.datetime.now()))
  124. response = requests.get(url+'/postgresql', headers=header)
  125. print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}")
  126. #print(result)
  127. def backupdata():
  128. timestamp , signature = generate_signature()
  129. header = {'Content-Type': 'application/json',
  130. 'X-Timestamp': timestamp,
  131. 'X-Signature': signature,
  132. }
  133. #print('backupdata time:{}'.format(datetime.datetime.now()))
  134. response = requests.get(url+'/bakupdata', headers=header)
  135. print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}")
  136. def cpu_bound_job():
  137. """CPU密集型任务,使用进程池"""
  138. pass
  139. scheduler.add_job(getflightdata, 'interval', seconds=30)
  140. scheduler.add_job(getTable, 'interval', seconds=60)
  141. scheduler.add_job(postgresqlCheck, 'interval', seconds=60)
  142. scheduler.add_job(backupdata, 'interval', minutes=5)
  143. # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool')
  144. print('开启定时任务')
  145. scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  146. scheduler.start()