apiFunction.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import json
  2. import requests
  3. import time
  4. import hmac
  5. import hashlib
  6. SIGNATURE_KEY=b"secret-key-neverout" # 用于请求签名的密钥
  7. class api_fucntion:
  8. def __init__(self):
  9. self.url = 'http://localhost:5001'
  10. self.headers = {'Content-Type': 'application/json'}
  11. #self.headers_with_token = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': ''}
  12. self.access_token = None
  13. self.refresh_token = None
  14. def generate_signature(self,data) :
  15. """
  16. 生成请求签名
  17. :param data: 请求的JSON数据
  18. :return: (时间戳, 签名)
  19. """
  20. # 1. 获取当前Unix时间戳(秒级)
  21. timestamp = str(int(time.time()))
  22. if not data:
  23. data={"timestamp":timestamp}
  24. # 2. 对数据进行JSON序列化,确保排序一致
  25. # sort_keys=True 保证键的顺序一致,否则签名会不匹配
  26. data_str = json.dumps(data, sort_keys = True).encode('utf-8')
  27. # 3. 组合数据:数据字符串 + | + 时间戳
  28. signature_base = f"{data_str.decode('utf-8')}|{timestamp}".encode('utf-8')
  29. # 4. 使用HMAC-SHA256算法生成签名
  30. signature = hmac.new(
  31. SIGNATURE_KEY,
  32. signature_base,
  33. hashlib.sha256
  34. ).hexdigest()
  35. return timestamp, signature
  36. def headers_with_token(self, token):
  37. return {'Content-Type': 'application/json', 'Authorization': token}
  38. def get_access_token(self, username, password):
  39. data = {'username': username, 'password': password}
  40. timestamp, signature=self.generate_signature(data)
  41. headers={'Content-Type': 'application/json',
  42. 'X-Timestamp': timestamp,
  43. 'X-Signature': signature,
  44. }
  45. response = requests.post(self.url + '/loginTable', headers=headers, data=json.dumps(data))
  46. #print(response.json())
  47. if response.status_code == 200:
  48. self.access_token = 'Bearer ' + response.json()['access_token']
  49. self.refresh_token = 'Bearer ' + response.json()['refresh_token']
  50. return response.json()['data']
  51. def refresh_access_token(self):
  52. response = requests.post(self.url + '/refresh', headers=self.headers_with_token(self.refresh_token))
  53. self.access_token = 'Bearer '+response.json()['access_token']
  54. if response.status_code == 200:
  55. #print('Successfully refreshed access token')
  56. return True
  57. else:
  58. #print('Failed to refresh access token')
  59. return False
  60. def request_get(self, uri, **kwargs):
  61. response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token))
  62. data = response.json()
  63. print(type(data))
  64. if response.status_code == 401 and data['msg'] == 'Token has expired':
  65. if self.refresh_access_token():
  66. response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token))
  67. data = response.json()
  68. else:
  69. data['msg'] = 'Failed to refresh access token'
  70. return data
  71. def request_post(self, uri, **kwargs):
  72. response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs))
  73. data = response.json()
  74. if response.status_code == 401 and data['msg'] == 'Token has expired':
  75. if self.refresh_access_token():
  76. response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs))
  77. data = response.json()
  78. else:
  79. data['msg'] = 'Failed to refresh access token'
  80. return data
  81. def request_get_without_token(self, uri):
  82. timestamp, signature=self.generate_signature(None)
  83. headers={'Content-Type': 'application/json',
  84. 'X-Timestamp': timestamp,
  85. 'X-Signature': signature,
  86. }
  87. response = requests.get(self.url + uri, headers=headers)
  88. data = response.json()
  89. if response.status_code == 200:
  90. return data
  91. else:
  92. data['msg'] = 'Failed to get data'
  93. return data
  94. def request_post_without_token(self, uri, **kwargs):
  95. timestamp, signature=self.generate_signature(kwargs)
  96. headers={'Content-Type': 'application/json',
  97. 'X-Timestamp': timestamp,
  98. 'X-Signature': signature,
  99. }
  100. response = requests.post(self.url + uri, headers=headers, data=json.dumps(kwargs))
  101. data = response.json()
  102. if response.status_code == 200:
  103. return data
  104. else:
  105. data['msg'] = 'Failed to post data'
  106. return data
  107. api_fc = api_fucntion()