Feat 8.frontend development #22
211
frontend/api.py
211
frontend/api.py
@@ -1,8 +1,7 @@
|
|||||||
|
import urllib
|
||||||
import requests
|
import requests
|
||||||
from json import loads, dumps
|
|
||||||
from types import SimpleNamespace
|
|
||||||
|
|
||||||
URL = "http://127.0.0.1:8000"
|
DEFAULT_URL = "http://127.0.0.1:8000"
|
||||||
API_LISTS_LIST = "api/lists/"
|
API_LISTS_LIST = "api/lists/"
|
||||||
API_LISTS_CREATE = "api/lists/"
|
API_LISTS_CREATE = "api/lists/"
|
||||||
API_LISTS_READ = "lists/{0}/"
|
API_LISTS_READ = "lists/{0}/"
|
||||||
@@ -11,81 +10,155 @@ API_LISTS_PARTIAL_UPDATE = "lists/{0}/"
|
|||||||
API_LISTS_DELETE = "lists/{0}/"
|
API_LISTS_DELETE = "lists/{0}/"
|
||||||
API_TOKEN = "api/token/"
|
API_TOKEN = "api/token/"
|
||||||
|
|
||||||
CODE_SUCCESS = 200
|
|
||||||
JSON_HEADERS = {'content-type': 'application/json', 'accept': 'application/json'}
|
|
||||||
|
|
||||||
get_api = lambda x: URL + "/" + x
|
|
||||||
dump = lambda x: bytes(dumps(x), encoding="utf-8")
|
|
||||||
|
|
||||||
class User(object):
|
class User(object):
|
||||||
|
def __init__(self, url=DEFAULT_URL, token=None):
|
||||||
|
"""
|
||||||
|
Constructor
|
||||||
|
|
||||||
def __init__(self):
|
Parameters
|
||||||
pass
|
----------
|
||||||
|
url : str, optional
|
||||||
|
Server url. The default is DEFAULT_URL.
|
||||||
|
token : dict, optional
|
||||||
|
Existing user tokens to bypass authorization.
|
||||||
|
The default is None.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
None.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.token = token
|
||||||
|
self.get_api = lambda x: urllib.parse.urljoin(url, x)
|
||||||
|
|
||||||
|
# ToDo - store tokens in config
|
||||||
def auth(self, user, passwd):
|
def auth(self, user, passwd):
|
||||||
url = get_api(API_TOKEN)
|
"""
|
||||||
data = dump({"username": f"{user}", "password": f"{passwd}"})
|
Authosization
|
||||||
code, self.token = User.post(url=url, data=data, headers=JSON_HEADERS)
|
|
||||||
return code
|
Parameters
|
||||||
|
----------
|
||||||
|
user : str
|
||||||
|
Login.
|
||||||
|
passwd : str
|
||||||
|
Password.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
dict
|
||||||
|
Generated auth token.
|
||||||
|
|
||||||
|
"""
|
||||||
|
url = self.get_api(API_TOKEN)
|
||||||
|
data = {"username": user, "password": passwd}
|
||||||
|
response = requests.post(url=url, json=data)
|
||||||
|
response.raise_for_status()
|
||||||
|
self.token = response.json()
|
||||||
|
return self.token
|
||||||
|
|
||||||
def list(self):
|
def list(self):
|
||||||
return User.get(
|
"""
|
||||||
url=get_api(API_LISTS_LIST),
|
List all the exsiting to-do lists.
|
||||||
headers=self._authorised_())
|
Auth required
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
list
|
||||||
|
to-do lists.
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = requests.get(url=self.get_api(API_LISTS_LIST), headers=self._access_token_())
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
def create(self, title="Untitled"):
|
def create(self, title="Untitled"):
|
||||||
return User.post(
|
"""
|
||||||
url=get_api(API_LISTS_CREATE),
|
Create a new to-do list
|
||||||
data=dump({"title": title}),
|
|
||||||
headers=self._authorised_())
|
Parameters
|
||||||
|
----------
|
||||||
|
title : str, optional
|
||||||
|
New list name. The default is "Untitled".
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = requests.post(
|
||||||
|
url=self.get_api(API_LISTS_CREATE), json={"title": title}, headers=self._access_token_()
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
def read(self, id):
|
def read(self, id):
|
||||||
return User.post(
|
"""
|
||||||
url=get_api(API_LISTS_READ).format(id),
|
Read a to-do list contents
|
||||||
headers=self._authorised_())
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
id : int
|
||||||
|
List id.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
list
|
||||||
|
Requested contents
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = requests.post(
|
||||||
|
url=self.get_api(API_LISTS_READ).format(id), headers=self._access_token_()
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
def update(self, id, title="Untitled"):
|
def update(self, id, title="Untitled"):
|
||||||
return User.__request__(
|
"""
|
||||||
func=requests.put,
|
Add a to-do item to the list
|
||||||
data=dump({"title": title}),
|
|
||||||
url=get_api(API_LISTS_UPDATE).format(id),
|
Parameters
|
||||||
headers=self._authorised_())
|
----------
|
||||||
|
id : int
|
||||||
|
List id.
|
||||||
|
title : str, optional
|
||||||
|
To-do item title. The default is "Untitled".
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = requests.put(
|
||||||
|
json={"title": title},
|
||||||
|
url=self.get_api(API_LISTS_UPDATE).format(id),
|
||||||
|
headers=self._access_token_(),
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
def partial_update(self, id, title="Untitled"):
|
def partial_update(self, id, title="Untitled"):
|
||||||
return User.__request__(
|
"""
|
||||||
func=requests.patch,
|
Update list item - untrusted
|
||||||
data=dump({"title": title}),
|
|
||||||
url=get_api(API_LISTS_PARTIAL_UPDATE).format(id),
|
"""
|
||||||
headers=self._authorised_())
|
response = requests.patch(
|
||||||
|
json={"title": title},
|
||||||
|
url=self.get_api(API_LISTS_PARTIAL_UPDATE).format(id),
|
||||||
|
headers=self._access_token_(),
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
def delete(self, id):
|
def delete(self, id):
|
||||||
return User.__request__(
|
"""
|
||||||
func=requests.delete,
|
Delete list
|
||||||
url=get_api(API_LISTS_DELETE).format(id),
|
|
||||||
headers=self._authorised_())
|
Parameters
|
||||||
|
----------
|
||||||
def _authorised_(self, headers=JSON_HEADERS):
|
id : int
|
||||||
if not hasattr(self, "token"):
|
List id to delete.
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = requests.delete(
|
||||||
|
url=self.get_api(API_LISTS_DELETE).format(id), headers=self._access_token_()
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def _access_token_(self):
|
||||||
|
if self.token is None:
|
||||||
raise RuntimeError("Authosization required for requested operation!")
|
raise RuntimeError("Authosization required for requested operation!")
|
||||||
headers = headers.copy()
|
return {"Authorization": f'Bearer {self.token["access"]}'}
|
||||||
headers['Authorization'] = f'Bearer {self.token.access}'
|
|
||||||
return headers
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get(**argv):
|
|
||||||
return User.__request__(requests.get, **argv)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def post(**argv):
|
|
||||||
return User.__request__(requests.post, **argv)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __request__(func, verbose=True, **argv):
|
|
||||||
with func(**argv) as r:
|
|
||||||
if r.status_code == CODE_SUCCESS:
|
|
||||||
data = loads(r.text)
|
|
||||||
if type(data) is dict:
|
|
||||||
data = SimpleNamespace(**data)
|
|
||||||
else:
|
|
||||||
data = None
|
|
||||||
return r.status_code, data
|
|
||||||
|
|||||||
@@ -1,11 +1,19 @@
|
|||||||
from api import User, CODE_SUCCESS
|
from api import User
|
||||||
|
|
||||||
|
|
||||||
|
def ignore_exceptions(*args, **argv):
|
||||||
|
try:
|
||||||
|
args[0](*(args[1:]), **argv)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
|
||||||
user = User()
|
user = User()
|
||||||
print("testing api methods...")
|
print("testing api methods...")
|
||||||
print("auth...", user.auth("root", "root") == CODE_SUCCESS)
|
print("auth..."), ignore_exceptions(user.auth, "root", "root")
|
||||||
print("list...", user.list()[0] == CODE_SUCCESS)
|
print("list..."), ignore_exceptions(user.list)
|
||||||
print("create...", user.create()[0] == CODE_SUCCESS)
|
print("create..."), ignore_exceptions(user.create)
|
||||||
print("read...", user.read(id=0)[0] == CODE_SUCCESS)
|
print("read..."), ignore_exceptions(user.read, id=0)
|
||||||
print("update...", user.update(id=0, title="Title")[0] == CODE_SUCCESS)
|
print("update..."), ignore_exceptions(user.update, id=0, title="Title")
|
||||||
print("partial_update...", user.partial_update(id=0, title="Title")[0] == CODE_SUCCESS)
|
print("partial_update..."), ignore_exceptions(user.partial_update, id=0, title="Title")
|
||||||
print("delete...", user.update(id=0)[0] == CODE_SUCCESS)
|
print("delete..."), ignore_exceptions(user.update, id=0)
|
||||||
|
|||||||
Reference in New Issue
Block a user