scheduleCenter.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import datetime
  2. import json
  3. from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor
  4. import requests
  5. from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
  6. from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor
  7. from apscheduler.schedulers.background import BlockingScheduler
  8. # 配置执行器
  9. executors = {
  10. 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程
  11. 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程
  12. }
  13. # 配置任务默认参数
  14. job_defaults = {
  15. 'coalesce': False, # 任务积压时是否合并执行
  16. 'max_instances': 3, # 最大并发实例数
  17. 'misfire_grace_time': 60 # 任务误点容忍时间(秒)
  18. }
  19. # 创建调度器并配置执行器
  20. scheduler = BlockingScheduler(
  21. executors=executors,
  22. job_defaults=job_defaults,
  23. timezone='Asia/Shanghai'
  24. )
  25. def my_listener(event):
  26. if event.exception:
  27. print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}")
  28. else:
  29. print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}")
  30. url = 'http://localhost:5001'
  31. headers = {'Content-Type': 'application/json'}
  32. num = 1
  33. # 添加任务到不同执行器
  34. # 定义A和B请求的处理函数(单独抽离)
  35. def handle_flight_data_request(req_type):
  36. try:
  37. response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=headers)
  38. response_json = response.json()
  39. print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}")
  40. except Exception as e:
  41. print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}")
  42. def getflightdata():
  43. #print('getflightdata time:{}'.format(datetime.datetime.now()))
  44. global num
  45. """IO密集型任务,使用默认线程池"""
  46. response = requests.get(url+'/checkLogin', headers=headers)
  47. print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}")
  48. if response.json()['code'] == "0":
  49. print(f"【{datetime.datetime.now()}】AMRO未登录")
  50. else:
  51. with ConcurrentThreadPoolExecutor(max_workers = 2) as executor :
  52. if num > 2 :
  53. # 异步执行A请求
  54. executor.submit(handle_flight_data_request, 'A')
  55. print(f"【{datetime.datetime.now()}】A刷新请求;")
  56. num = 1
  57. else:
  58. # 异步执行B请求
  59. executor.submit(handle_flight_data_request, 'B')
  60. print(f"【{datetime.datetime.now()}】B刷新请求;")
  61. num += 1
  62. '''
  63. if num >2:
  64. getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers)
  65. # result = getFlightDataA.json()
  66. print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}")
  67. num = 1
  68. else:
  69. getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers)
  70. print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}")
  71. num += 1
  72. '''
  73. def getTable():
  74. response = requests.get(url+'/table', headers=headers)
  75. print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1]}")
  76. #print(result)
  77. def postgresqlCheck():
  78. #print('postgresqlCheck time:{}'.format(datetime.datetime.now()))
  79. response = requests.get(url+'/postgresql', headers=headers)
  80. print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}")
  81. #print(result)
  82. def backupdata():
  83. #print('backupdata time:{}'.format(datetime.datetime.now()))
  84. response = requests.get(url+'/bakupdata', headers=headers)
  85. print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}")
  86. def cpu_bound_job():
  87. """CPU密集型任务,使用进程池"""
  88. pass
  89. scheduler.add_job(getflightdata, 'interval', seconds=30)
  90. scheduler.add_job(getTable, 'interval', seconds=60)
  91. scheduler.add_job(postgresqlCheck, 'interval', seconds=60)
  92. scheduler.add_job(backupdata, 'interval', minutes=5)
  93. # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool')
  94. print('开启定时任务')
  95. scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  96. scheduler.start()