1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import json
- import requests
- class api_fucntion:
- def __init__(self):
- self.url = 'http://localhost:5001'
- self.headers = {'Content-Type': 'application/json'}
- #self.headers_with_token = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': ''}
- self.access_token = None
- self.refresh_token = None
- def headers_with_token(self, token):
- return {'Content-Type': 'application/json', 'Authorization': token}
- def get_access_token(self, username, password):
- data = {'username': username, 'password': password}
- response = requests.post(self.url + '/loginTable', headers=self.headers, data=json.dumps(data))
- print(response.json())
- if response.status_code == 200:
- self.access_token = 'Bearer '+response.json()['access_token']
- self.refresh_token = 'Bearer '+response.json()['refresh_token']
- return response.json()['data']
- def refresh_access_token(self):
- response = requests.post(self.url + '/refresh', headers=self.headers_with_token(self.refresh_token))
- self.access_token = 'Bearer '+response.json()['access_token']
- if response.status_code == 200:
- print('Successfully refreshed access token')
- return True
- else:
- print('Failed to refresh access token')
- return False
- def request_get(self, uri, **kwargs):
- response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token))
- data = response.json()
- print(type(data))
- if response.status_code == 401 and data['msg'] == 'Token has expired':
- if self.refresh_access_token():
- response = requests.get(self.url + uri, headers=self.headers_with_token(self.access_token))
- data = response.json()
- else:
- data['msg'] = 'Failed to refresh access token'
- return data
- def request_post(self, uri, **kwargs):
- response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs))
- data = response.json()
- if response.status_code == 401 and data['msg'] == 'Token has expired':
- if self.refresh_access_token():
- response = requests.post(self.url + uri, headers=self.headers_with_token(self.access_token), data=json.dumps(kwargs))
- data = response.json()
- else:
- data['msg'] = 'Failed to refresh access token'
- return data
- api_fc = api_fucntion()
- if __name__ == '__main__':
- # access_token, refresh_token = login('jy', '123456')
- # time.sleep(11)
- # getdict(access_token)
- # refreshing_token(refresh_token)
- Api_fc = api_fucntion()
- Api_fc.get_access_token('admin', 'admin123')
- # Api_fc.request_post('/changeSinglepoepleNotice', boolsts='1')
- Api_fc.request_get('/getSinglepoepleNotice')
|