import json import requests import time import hmac import hashlib SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥 class api_fucntion: def __init__(self): self.url = 'http://localhost:5001' self.headers = {'Content-Type': 'application/json'} #self.headers_with_token = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': ''} self.access_token = None self.refresh_token = None def generate_signature(self,data) : """ 生成请求签名 :param data: 请求的JSON数据 :return: (时间戳, 签名) """ # 1. 获取当前Unix时间戳(秒级) timestamp = str(int(time.time())) if not data: data={"timestamp":timestamp} # 2. 对数据进行JSON序列化,确保排序一致 # sort_keys=True 保证键的顺序一致,否则签名会不匹配 data_str = json.dumps(data, sort_keys = True).encode('utf-8') # 3. 组合数据:数据字符串 + | + 时间戳 signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8') # 4. 使用HMAC-SHA256算法生成签名 signature = hmac.new( SIGNATURE_KEY, signature_base, hashlib.sha256 ).hexdigest() return timestamp, signature def headers_with_token(self, token): return {'Content-Type': 'application/json', 'Authorization': token} def get_access_token(self, username, password): data = {'username': username, 'password': password} timestamp, signature=self.generate_signature(data) headers={'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.post(self.url + '/loginTable', headers=headers, data=json.dumps(data)) #print(response.json()) if response.status_code == 200: self.access_token = 'Bearer ' + response.json()['access_token'] self.refresh_token = 'Bearer ' + response.json()['refresh_token'] return response.json()['data'] def refresh_access_token(self): response = requests.post(self.url + '/refresh', headers=self.headers_with_token(self.refresh_token)) self.access_token = 'Bearer '+response.json()['access_token'] if response.status_code == 200: #print('Successfully refreshed access token') return True else: #print('Failed to refresh access token') return False def request_get(self, uri, **kwargs): response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token)) data = response.json() print(type(data)) if response.status_code == 401 and data['msg'] == 'Token has expired': if self.refresh_access_token(): response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token)) data = response.json() else: data['msg'] = 'Failed to refresh access token' return data def request_post(self, uri, **kwargs): response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs)) data = response.json() if response.status_code == 401 and data['msg'] == 'Token has expired': if self.refresh_access_token(): response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs)) data = response.json() else: data['msg'] = 'Failed to refresh access token' return data def request_get_without_token(self, uri): timestamp, signature=self.generate_signature(None) headers={'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.get(self.url + uri, headers=headers) data = response.json() if response.status_code == 200: return data else: data['msg'] = 'Failed to get data' return data def request_post_without_token(self, uri, **kwargs): timestamp, signature=self.generate_signature(kwargs) headers={'Content-Type': 'application/json', 'X-Timestamp': timestamp, 'X-Signature': signature, } response = requests.post(self.url + uri, headers=headers, data=json.dumps(kwargs)) data = response.json() if response.status_code == 200: return data else: data['msg'] = 'Failed to post data' return data api_fc = api_fucntion()