|
@@ -1,7 +1,10 @@
|
|
import json
|
|
import json
|
|
import requests
|
|
import requests
|
|
|
|
+import time
|
|
|
|
+import hmac
|
|
|
|
+import hashlib
|
|
|
|
|
|
-
|
|
|
|
|
|
+SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥
|
|
class api_fucntion:
|
|
class api_fucntion:
|
|
def __init__(self):
|
|
def __init__(self):
|
|
self.url = 'http://localhost:5001'
|
|
self.url = 'http://localhost:5001'
|
|
@@ -10,12 +13,44 @@ class api_fucntion:
|
|
self.access_token = None
|
|
self.access_token = None
|
|
self.refresh_token = None
|
|
self.refresh_token = None
|
|
|
|
|
|
|
|
+ def generate_signature(self,data) :
|
|
|
|
+ """
|
|
|
|
+ 生成请求签名
|
|
|
|
+ :param data: 请求的JSON数据
|
|
|
|
+ :return: (时间戳, 签名)
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ # 1. 获取当前Unix时间戳(秒级)
|
|
|
|
+ timestamp = str(int(time.time()))
|
|
|
|
+ if not data:
|
|
|
|
+ data={"timestamp":timestamp}
|
|
|
|
+ # 2. 对数据进行JSON序列化,确保排序一致
|
|
|
|
+ # sort_keys=True 保证键的顺序一致,否则签名会不匹配
|
|
|
|
+ data_str = json.dumps(data, sort_keys = True).encode('utf-8')
|
|
|
|
+
|
|
|
|
+ # 3. 组合数据:数据字符串 + | + 时间戳
|
|
|
|
+ signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8')
|
|
|
|
+
|
|
|
|
+ # 4. 使用HMAC-SHA256算法生成签名
|
|
|
|
+ signature = hmac.new(
|
|
|
|
+ SIGNATURE_KEY,
|
|
|
|
+ signature_base,
|
|
|
|
+ hashlib.sha256
|
|
|
|
+ ).hexdigest()
|
|
|
|
+
|
|
|
|
+ return timestamp, signature
|
|
|
|
+
|
|
def headers_with_token(self, token):
|
|
def headers_with_token(self, token):
|
|
return {'Content-Type': 'application/json', 'Authorization': token}
|
|
return {'Content-Type': 'application/json', 'Authorization': token}
|
|
|
|
|
|
def get_access_token(self, username, password):
|
|
def get_access_token(self, username, password):
|
|
data = {'username': username, 'password': password}
|
|
data = {'username': username, 'password': password}
|
|
- response = requests.post(self.url + '/loginTable', headers=self.headers, data=json.dumps(data))
|
|
|
|
|
|
+ timestamp, signature=self.generate_signature(data)
|
|
|
|
+ headers={'Content-Type': 'application/json',
|
|
|
|
+ 'X-Timestamp': timestamp,
|
|
|
|
+ 'X-Signature': signature,
|
|
|
|
+ }
|
|
|
|
+ response = requests.post(self.url + '/loginTable', headers=headers, data=json.dumps(data))
|
|
#print(response.json())
|
|
#print(response.json())
|
|
if response.status_code == 200:
|
|
if response.status_code == 200:
|
|
self.access_token = 'Bearer ' + response.json()['access_token']
|
|
self.access_token = 'Bearer ' + response.json()['access_token']
|
|
@@ -56,7 +91,12 @@ class api_fucntion:
|
|
return data
|
|
return data
|
|
|
|
|
|
def request_get_without_token(self, uri):
|
|
def request_get_without_token(self, uri):
|
|
- response = requests.get(self.url + uri, headers=self.headers)
|
|
|
|
|
|
+ timestamp, signature=self.generate_signature(None)
|
|
|
|
+ headers={'Content-Type': 'application/json',
|
|
|
|
+ 'X-Timestamp': timestamp,
|
|
|
|
+ 'X-Signature': signature,
|
|
|
|
+ }
|
|
|
|
+ response = requests.get(self.url + uri, headers=headers)
|
|
data = response.json()
|
|
data = response.json()
|
|
if response.status_code == 200:
|
|
if response.status_code == 200:
|
|
return data
|
|
return data
|
|
@@ -65,7 +105,12 @@ class api_fucntion:
|
|
return data
|
|
return data
|
|
|
|
|
|
def request_post_without_token(self, uri, **kwargs):
|
|
def request_post_without_token(self, uri, **kwargs):
|
|
- response = requests.post(self.url + uri, headers=self.headers, data=json.dumps(kwargs))
|
|
|
|
|
|
+ timestamp, signature=self.generate_signature(kwargs)
|
|
|
|
+ headers={'Content-Type': 'application/json',
|
|
|
|
+ 'X-Timestamp': timestamp,
|
|
|
|
+ 'X-Signature': signature,
|
|
|
|
+ }
|
|
|
|
+ response = requests.post(self.url + uri, headers=headers, data=json.dumps(kwargs))
|
|
data = response.json()
|
|
data = response.json()
|
|
if response.status_code == 200:
|
|
if response.status_code == 200:
|
|
return data
|
|
return data
|