apiFunction.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import json
  2. import requests
  3. class api_fucntion:
  4. def __init__(self):
  5. self.url = 'http://localhost:5001'
  6. self.headers = {'Content-Type': 'application/json'}
  7. #self.headers_with_token = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': ''}
  8. self.access_token = None
  9. self.refresh_token = None
  10. def headers_with_token(self, token):
  11. return {'Content-Type': 'application/json', 'Authorization': token}
  12. def get_access_token(self, username, password):
  13. data = {'username': username, 'password': password}
  14. response = requests.post(self.url + '/loginTable', headers=self.headers, data=json.dumps(data))
  15. print(response.json())
  16. if response.status_code == 200:
  17. self.access_token = 'Bearer '+response.json()['access_token']
  18. self.refresh_token = 'Bearer '+response.json()['refresh_token']
  19. return response.json()['data']
  20. def refresh_access_token(self):
  21. response = requests.post(self.url + '/refresh', headers=self.headers_with_token(self.refresh_token))
  22. self.access_token = 'Bearer '+response.json()['access_token']
  23. if response.status_code == 200:
  24. print('Successfully refreshed access token')
  25. return True
  26. else:
  27. print('Failed to refresh access token')
  28. return False
  29. def request_get(self, uri, **kwargs):
  30. response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token))
  31. data = response.json()
  32. print(type(data))
  33. if response.status_code == 401 and data['msg'] == 'Token has expired':
  34. if self.refresh_access_token():
  35. response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token))
  36. data = response.json()
  37. else:
  38. data['msg'] = 'Failed to refresh access token'
  39. return data
  40. def request_post(self, uri, **kwargs):
  41. response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs))
  42. data = response.json()
  43. if response.status_code == 401 and data['msg'] == 'Token has expired':
  44. if self.refresh_access_token():
  45. response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs))
  46. data = response.json()
  47. else:
  48. data['msg'] = 'Failed to refresh access token'
  49. return data
  50. api_fc = api_fucntion()
  51. if __name__ == '__main__':
  52. # access_token, refresh_token = login('jy', '123456')
  53. # time.sleep(11)
  54. # getdict(access_token)
  55. # refreshing_token(refresh_token)
  56. Api_fc = api_fucntion()
  57. Api_fc.get_access_token('admin', 'admin123')
  58. # Api_fc.request_post('/changeSinglepoepleNotice', boolsts='1')
  59. Api_fc.request_get('/getSinglepoepleNotice')