Skip to content

Commit

Permalink
Fix mypy + type del + remove deprecated + comment class
Browse files Browse the repository at this point in the history
  • Loading branch information
Quentame authored Oct 18, 2023
1 parent 817b57b commit 4c8c876
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 124 deletions.
79 changes: 49 additions & 30 deletions src/freebox_api/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Dict
from urllib.parse import urljoin

from aiohttp import ClientSession

from freebox_api.exceptions import AuthorizationError
from freebox_api.exceptions import HttpRequestError
from freebox_api.exceptions import InsufficientPermissionsError
Expand All @@ -13,7 +15,14 @@


class Access:
def __init__(self, session, base_url, app_token, app_id, http_timeout):
def __init__(
self,
session: ClientSession,
base_url: str,
app_token: str,
app_id: str,
http_timeout: int,
):
self.session = session
self.base_url = base_url
self.app_token = app_token
Expand All @@ -27,16 +36,18 @@ async def _get_challenge(self, base_url, timeout=10):
Return challenge from freebox API
"""
url = urljoin(base_url, "login")
r = await self.session.get(url, timeout=timeout)
resp = await r.json()
resp = await self.session.get(url, timeout=timeout)
resp_data = await resp.json()

# raise exception if resp.success != True
if not resp.get("success"):
if not resp_data.get("success"):
raise AuthorizationError(
"Getting challenge failed (APIResponse: {})".format(json.dumps(resp))
"Getting challenge failed (APIResponse: {})".format(
json.dumps(resp_data)
)
)

return resp["result"]["challenge"]
return resp_data["result"]["challenge"]

async def _get_session_token(self, base_url, app_token, app_id, timeout=10):
"""
Expand All @@ -52,17 +63,19 @@ async def _get_session_token(self, base_url, app_token, app_id, timeout=10):

url = urljoin(base_url, "login/session/")
data = json.dumps({"app_id": app_id, "password": password})
r = await self.session.post(url, data=data, timeout=timeout)
resp = await r.json()
resp = await self.session.post(url, data=data, timeout=timeout)
resp_data = await resp.json()

# raise exception if resp.success != True
if not resp.get("success"):
if not resp_data.get("success"):
raise AuthorizationError(
"Starting session failed (APIResponse: {})".format(json.dumps(resp))
"Starting session failed (APIResponse: {})".format(
json.dumps(resp_data)
)
)

session_token = resp["result"].get("session_token")
session_permissions = resp["result"].get("permissions")
session_token = resp_data["result"].get("session_token")
session_permissions = resp_data["result"].get("permissions")

return (session_token, session_permissions)

Expand Down Expand Up @@ -93,54 +106,60 @@ async def _perform_request(self, verb, end_url, **kwargs):
"headers": self._get_headers(),
"timeout": self.timeout,
}
r = await verb(url, **request_params)
resp = await verb(url, **request_params)

# Return response if content is not json
if r.content_type != "application/json":
return r
if resp.content_type != "application/json":
return resp

resp = await r.json()
if resp.get("error_code") in ["auth_required", "invalid_session"]:
resp_data = await resp.json()
if resp_data.get("error_code") in ["auth_required", "invalid_session"]:
logger.debug("Invalid session")
await self._refresh_session_token()
request_params["headers"] = self._get_headers()
r = await verb(url, **request_params)
resp = await r.json()
resp = await verb(url, **request_params)
resp_data = await resp.json()

if not resp["success"]:
err_msg = "Request failed (APIResponse: {})".format(json.dumps(resp))
if resp.get("error_code") == "insufficient_rights":
if not resp_data["success"]:
err_msg = "Request failed (APIResponse: {})".format(json.dumps(resp_data))
if resp_data.get("error_code") == "insufficient_rights":
raise InsufficientPermissionsError(err_msg)
raise HttpRequestError(err_msg)

return resp["result"] if "result" in resp else None
return resp_data.get("result")

async def get(self, end_url: str) -> Dict[str, Any]:
async def get(self, end_url: str) -> Any: # Dict[str, Any] | List[Dict[str, Any]]:
"""
Send get request and return results
"""
return await self._perform_request(self.session.get, end_url)

async def post(self, end_url: str, payload=None) -> Dict[str, Any]:
async def post(
self, end_url: str, payload: Dict[str, Any] | None = None
) -> Dict[str, Any]:
"""
Send post request and return results
"""
data = json.dumps(payload) if payload else None
return await self._perform_request(self.session.post, end_url, data=data)
return await self._perform_request(self.session.post, end_url, data=data) # type: ignore

async def put(self, end_url: str, payload=None) -> Dict[str, Any]:
async def put(
self, end_url: str, payload: Dict[str, Any] | None = None
) -> Dict[str, Any]:
"""
Send post request and return results
"""
data = json.dumps(payload) if payload else None
return await self._perform_request(self.session.put, end_url, data=data)
return await self._perform_request(self.session.put, end_url, data=data) # type: ignore

async def delete(self, end_url: str, payload=None):
async def delete(
self, end_url: str, payload: Dict[str, Any] | None = None
) -> Dict[str, bool] | None:
"""
Send delete request and return results
"""
data = json.dumps(payload) if payload else None
return await self._perform_request(self.session.delete, end_url, data=data)
return await self._perform_request(self.session.delete, end_url, data=data) # type: ignore

async def get_permissions(self) -> Dict[str, bool] | None:
"""
Expand Down
45 changes: 25 additions & 20 deletions src/freebox_api/aiofreepybox.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ssl
from typing import Any
from typing import Dict
from typing import Tuple
from urllib.parse import urljoin

from aiohttp import ClientSession
Expand Down Expand Up @@ -153,7 +154,7 @@ async def _get_freebox_access(
port: str,
api_version: str,
token_file: str,
app_desc,
app_desc: Dict[str, str],
timeout: int = 10,
) -> Access:
"""
Expand Down Expand Up @@ -214,7 +215,7 @@ async def _get_freebox_access(

async def _get_authorization_status(
self, base_url: str, track_id: int, timeout: int
):
) -> str:
"""
Get authorization status of the application token
Expand All @@ -226,54 +227,58 @@ async def _get_authorization_status(
denied: the user denied the authorization request
"""
url = urljoin(base_url, f"login/authorize/{track_id}")
r = await self._session.get(url, timeout=timeout)
resp = await r.json()
return resp["result"]["status"]
resp = await self._session.get(url, timeout=timeout)
resp_data = await resp.json()
return str(resp_data["result"]["status"])

async def _get_app_token(
self, base_url: str, app_desc: Dict[str, str], timeout: int = 10
) -> (str, int):
) -> Tuple[str, int]:
"""
Get the application token from the freebox
Returns (app_token, track_id)
"""
# Get authentification token
url = urljoin(base_url, "login/authorize/")
data = json.dumps(app_desc)
r = await self._session.post(url, data=data, timeout=timeout)
resp = await r.json()
resp = await self._session.post(url, data=data, timeout=timeout)
resp_data = await resp.json()

# raise exception if resp.success != True
if not resp.get("success"):
if not resp_data.get("success"):
raise AuthorizationError(
"Authorization failed (APIResponse: {0})".format(json.dumps(resp))
"Authorization failed (APIResponse: {0})".format(json.dumps(resp_data))
)

app_token: str = resp["result"]["app_token"]
track_id: int = resp["result"]["track_id"]
app_token: str = resp_data["result"]["app_token"]
track_id: int = resp_data["result"]["track_id"]

return (app_token, track_id)

def _writefile_app_token(
self, app_token: str, track_id: int, app_desc: Dict[str, str], file
):
self, app_token: str, track_id: int, app_desc: Dict[str, str], token_file: str
) -> None:
"""
Store the application token in g_app_auth_file file
"""
d = {**app_desc, "app_token": app_token, "track_id": track_id}
file_content: Dict[str, str | int] = {
**app_desc,
"app_token": app_token,
"track_id": track_id,
}

with open(file, "w") as f:
json.dump(d, f)
with open(token_file, "w") as f:
json.dump(file_content, f)

def _readfile_app_token(
self, file: str
) -> (str, int, Dict[str, Any]) | (None, None, None):
self, token_file: str
) -> Tuple[str, int, Dict[str, Any]] | Tuple[None, None, None]:
"""
Read the application token in the authentication file.
Returns (app_token, track_id, app_desc)
"""
try:
with open(file, "r") as f:
with open(token_file, "r") as f:
d = json.load(f)
app_token: str = d["app_token"]
track_id: int = d["track_id"]
Expand Down
5 changes: 4 additions & 1 deletion src/freebox_api/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
__all__ = []
"""Freebox APIs."""
from typing import List

__all__: List[str] = []
5 changes: 3 additions & 2 deletions src/freebox_api/api/airmedia.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""AirMedia API."""
from typing import Any
from typing import Dict
from typing import List
Expand Down Expand Up @@ -29,7 +30,7 @@ async def get_airmedia_receivers(self) -> List[Dict[str, Any]]:
"""
Get AirMedia receivers
"""
return await self._access.get("airmedia/receivers/")
return await self._access.get("airmedia/receivers/") # type: ignore

async def send_airmedia(
self, receiver_name: str, airmedia_data: Dict[str, Any]
Expand All @@ -46,7 +47,7 @@ async def get_airmedia_configuration(self) -> Dict[str, bool]:
"""
Get AirMedia configuration
"""
return await self._access.get("airmedia/config/")
return await self._access.get("airmedia/config/") # type: ignore

async def set_airmedia_configuration(
self, airmedia_config: Dict[str, Any]
Expand Down
23 changes: 6 additions & 17 deletions src/freebox_api/api/call.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
"""Call API."""
from typing import Dict

from freebox_api.access import Access

_LOGGER = logging.getLogger(__name__)


class Call:
"""
Expand All @@ -15,19 +14,19 @@ def __init__(self, access: Access):

mark_call_log_as_read_data_schema = {"new": False}

async def delete_call_log(self, log_id):
async def delete_call_log(self, log_id: int) -> Dict[str, bool]:
"""
Delete call log
log_id : `int`
"""
await self._access.delete(f"call/log/{log_id}")
return await self._access.delete(f"call/log/{log_id}") # type: ignore

async def delete_calls_log(self):
async def delete_calls_log(self) -> None:
"""
Delete calls log
"""
await self._access.delete("call/log/delete_all/")
return await self._access.delete("call/log/delete_all/") # type: ignore

async def get_call_log(self, log_id):
"""
Expand All @@ -41,16 +40,6 @@ async def get_calls_log(self):
"""
return await self._access.get("call/log/")

# TODO: remove
async def get_call_list(self):
"""
Returns the collection of all call entries
"""
_LOGGER.warning(
"Using deprecated get_call_list, please use get_calls_log instead"
)
return await self.get_calls_log()

async def mark_calls_log_as_read(self):
"""
Mark calls log as read
Expand Down
4 changes: 2 additions & 2 deletions src/freebox_api/api/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ async def get_xdsl(self):
"""
return await self._access.get("connection/xdsl/")

async def remove_connection_logs(self):
async def remove_connection_logs(self) -> None:
"""
Remove connection logs
"""
return await self._access.delete("connection/logs/")
return await self._access.delete("connection/logs/") # type: ignore

async def set_config(self, connection_configuration):
"""
Expand Down
27 changes: 3 additions & 24 deletions src/freebox_api/api/dhcp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Dict

from freebox_api.access import Access

Expand Down Expand Up @@ -36,11 +37,11 @@ async def create_dhcp_static_lease(self, static_lease):
"""
return await self._access.post("dhcp/static_lease/", static_lease)

async def delete_dhcp_static_lease(self, lease_id):
async def delete_dhcp_static_lease(self, lease_id: str) -> Dict[str, bool]:
"""
Delete dhcp static lease
"""
await self._access.delete(f"dhcp/static_lease/{lease_id}")
return await self._access.delete(f"dhcp/static_lease/{lease_id}") # type: ignore

async def edit_dhcp_static_lease(self, lease_id, static_lease):
"""
Expand Down Expand Up @@ -83,25 +84,3 @@ async def get_dhcp_static_leases(self):
Get the list of DHCP static leases
"""
return await self._access.get("dhcp/static_lease/")

# TODO: remove
async def get_dynamic_dhcp_lease(self):
"""
Get the list of DHCP dynamic leases
"""
logger.warning(
"Using deprecated call get_dynamic_dhcp_lease, please use "
"get_dhcp_dynamic_leases instead"
)
return await self.get_dhcp_dynamic_leases()

# TODO: remove
async def get_static_dhcp_lease(self):
"""
Get the list of DHCP static leases
"""
logger.warning(
"Using deprecated call get_static_dhcp_lease, please use "
"get_dhcp_static_leases instead"
)
return await self.get_dhcp_static_leases()
Loading

0 comments on commit 4c8c876

Please sign in to comment.