123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import datetime
- import json
- import time
- import hashlib
- import hmac
- from concurrent.futures import ThreadPoolExecutor as ConcurrentThreadPoolExecutor
- import requests
- from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
- from apscheduler.executors.pool import ThreadPoolExecutor , ProcessPoolExecutor
- from apscheduler.schedulers.background import BlockingScheduler
- # 配置执行器
- executors = {
- 'default': ThreadPoolExecutor(20), # 默认线程池,20个线程
- 'processpool': ProcessPoolExecutor(5) # 进程池,5个进程
- }
- # 配置任务默认参数
- job_defaults = {
- 'coalesce': False, # 任务积压时是否合并执行
- 'max_instances': 3, # 最大并发实例数
- 'misfire_grace_time': 60 # 任务误点容忍时间(秒)
- }
- # 创建调度器并配置执行器
- scheduler = BlockingScheduler(
- executors=executors,
- job_defaults=job_defaults,
- timezone='Asia/Shanghai'
- )
- SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥
- def generate_signature() :
- """
- 生成请求签名
- :param data: 请求的JSON数据
- :return: (时间戳, 签名)
- """
- # 1. 获取当前Unix时间戳(秒级)
- timestamp = str(int(time.time()))
- data = { "timestamp" : timestamp }
- # 2. 对数据进行JSON序列化,确保排序一致
- # sort_keys=True 保证键的顺序一致,否则签名会不匹配
- data_str = json.dumps(data , sort_keys = True).encode('utf-8')
- # 3. 组合数据:数据字符串 + | + 时间戳
- signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8')
- # 4. 使用HMAC-SHA256算法生成签名
- signature = hmac.new(
- SIGNATURE_KEY ,
- signature_base ,
- hashlib.sha256
- ).hexdigest()
- return timestamp , signature
- def my_listener(event):
- if event.exception:
- print(f"【{datetime.datetime.now()}】多线程异常,错误信息{event.exception}")
- else:
- print(f"【{datetime.datetime.now()}】多线程任务执行正常:{event}")
- url = 'http://localhost:5001'
- headers = {'Content-Type': 'application/json'}
- num = 1
- # 添加任务到不同执行器
- # 定义A和B请求的处理函数(单独抽离)
- def handle_flight_data_request(req_type):
- try:
- timestamp , signature = generate_signature()
- header = { 'Content-Type' : 'application/json' ,
- 'X-Timestamp' : timestamp ,
- 'X-Signature' : signature ,
- }
- response = requests.post(url + '/getFlightData2',data=json.dumps({'type': req_type}),headers=header)
- response_json = response.json()
- print(f"【{datetime.datetime.now()}】{req_type}刷新;结果:{response_json}")
- except Exception as e:
- print(f"【{datetime.datetime.now()}】{req_type} 请求执行异常: {str(e)}")
- def getflightdata():
- #print('getflightdata time:{}'.format(datetime.datetime.now()))
- global num
- """IO密集型任务,使用默认线程池"""
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- response = requests.get(url+'/checkLogin', headers=header)
- print(f"【{datetime.datetime.now()}】检查账号登录情况;结果为:{response.json()['code']}{response.json()['msg']}")
- if response.json()['code'] == "0":
- print(f"【{datetime.datetime.now()}】AMRO未登录")
- else:
- with ConcurrentThreadPoolExecutor(max_workers = 2) as executor :
- if num > 2 :
- # 异步执行A请求
- executor.submit(handle_flight_data_request, 'A')
- print(f"【{datetime.datetime.now()}】A刷新请求;")
- num = 1
- else:
- # 异步执行B请求
- executor.submit(handle_flight_data_request, 'B')
- print(f"【{datetime.datetime.now()}】B刷新请求;")
- num += 1
- '''
- if num >2:
- getFlightDataA = requests.post(url+'/getFlightData2', data=json.dumps({'type':'A'}),headers=headers)
- # result = getFlightDataA.json()
- print(f"【{datetime.datetime.now()}】A 请求刷新;结果:{getFlightDataA.json()}")
- num = 1
- else:
- getFlightDataB = requests.post(url+'/getFlightData2', data=json.dumps({'type':'B'}),headers=headers)
- print(f"【{datetime.datetime.now()}】B 请求刷新;结果:{getFlightDataB.json()}")
- num += 1
- '''
- def getTable():
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- response = requests.get(url+'/table', headers=header)
- print(f"【{datetime.datetime.now()}】最近一次刷新为:{response.json()['data'][-1] if response.json()['data'] else response.json()['msg'] }")
- #print(result)
- def postgresqlCheck():
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- #print('postgresqlCheck time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/postgresql', headers=header)
- print(f"【{datetime.datetime.now()}】云端计算请求:{response.json()}")
- #print(result)
- def backupdata():
- timestamp , signature = generate_signature()
- header = {'Content-Type': 'application/json',
- 'X-Timestamp': timestamp,
- 'X-Signature': signature,
- }
- #print('backupdata time:{}'.format(datetime.datetime.now()))
- response = requests.get(url+'/bakupdata', headers=header)
- print(f"【{datetime.datetime.now()}】5min备份请求:{response.json()}")
- def cpu_bound_job():
- """CPU密集型任务,使用进程池"""
- pass
- scheduler.add_job(getflightdata, 'interval', seconds=30)
- scheduler.add_job(getTable, 'interval', seconds=60)
- scheduler.add_job(postgresqlCheck, 'interval', seconds=60)
- scheduler.add_job(backupdata, 'interval', minutes=5)
- # scheduler.add_job(cpu_bound_job, 'interval', seconds=30, executor='processpool')
- print('开启定时任务')
- scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
- scheduler.start()
|