Skip to content

Commit

Permalink
refactor(api-python-sdk): fix type issues
Browse files Browse the repository at this point in the history
  • Loading branch information
HormCodes committed Mar 8, 2024
1 parent 552859b commit 2cda6f7
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 92 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ jobs:
- name: Check code style
run: poetry run pylint airthings_sdk

# - name: Check types
# run: poetry run mypy airthings_sdk
- name: Check types
run: poetry run mypy airthings_sdk
193 changes: 103 additions & 90 deletions api/python/airthings_sdk/parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Airthings API data handler."""
"""Module providing an Airthings API SDK."""

import logging
import time
from dataclasses import dataclass, field
from typing import List, Optional
from typing import List, Optional, cast

from httpx import AsyncClient, TimeoutException

Expand All @@ -12,17 +12,18 @@
from airthings_api_client.api.device import get_devices
from airthings_api_client.api.sensor import get_multiple_sensors
from airthings_api_client.errors import UnexpectedStatus
from airthings_api_client.models import Error, AccountResponse
from airthings_api_client.models import Error, SensorResponseType0
from airthings_api_client.models.device_response import DeviceResponse
from airthings_api_client.models.get_multiple_sensors_unit import GetMultipleSensorsUnit
from airthings_api_client.models.sensor_response_type_0 import SensorResponseType0
from airthings_api_client.models.sensors_response import SensorsResponse
from airthings_api_client.types import Unset, UNSET

AUTH_URL = "https://accounts-api.airthings.com/v1/token"
AUTH_URL = "https://accounts-api.airthings.com"
API_URL = "https://consumer-api.airthings.com"

logger = logging.getLogger(__name__)


@dataclass
class AirthingsSensor:
"""Representation of Airthings device sensor."""
Expand All @@ -33,13 +34,17 @@ class AirthingsSensor:

@classmethod
def init_from_sensor_response(
cls, sensor_response: SensorResponseType0
) -> "AirthingsSensor":
cls, sensor_response: SensorResponseType0 | None | Unset
):
"""Create an AirthingsSensor from a SensorResponseType0"""

if sensor_response is None or isinstance(sensor_response, Unset):
return None

return cls(
sensor_type=sensor_response.sensor_type,
value=sensor_response.value,
unit=sensor_response.unit,
sensor_type=cast(str, sensor_response.sensor_type),
value=cast(float | int, sensor_response.value),
unit=cast(str, sensor_response.unit),
)


Expand All @@ -48,9 +53,9 @@ class AirthingsDevice:
"""Representation of an Airthings device"""

serial_number: str
name: str
home: str
type: str
name: str
home: Optional[str]
recorded: Optional[str]
sensors: list[AirthingsSensor] = field(default_factory=list)

Expand All @@ -59,36 +64,35 @@ def init_from_device_response(
cls, device_response: DeviceResponse, sensors_response: SensorsResponse
) -> "AirthingsDevice":
"""Create an AirthingsDevice from a DeviceResponse and a SensorsResponse"""
sensors = [
AirthingsSensor.init_from_sensor_response(sensor)
for sensor in sensors_response.sensors
]
sensors.append(
AirthingsSensor(
sensor_type="battery",
value=sensors_response.battery_percentage,
unit="%",
)

mapped = map(
AirthingsSensor.init_from_sensor_response, sensors_response.sensors or []
)
filtered = list(filter(lambda sensor: sensor is not None, mapped))

if sensors_response.battery_percentage is not None:
filtered.append(
AirthingsSensor(
sensor_type="battery",
value=cast(int, sensors_response.battery_percentage),
unit="%",
)
)
return cls(
serial_number=device_response.serial_number,
name=device_response.name,
home=device_response.home,
type=device_response.type,
recorded=sensors_response.recorded,
sensors=sensors,
serial_number=cast(str, device_response.serial_number),
name=cast(str, device_response.name),
type=cast(str, device_response.type),
home=cast(str | None, device_response.home),
recorded=cast(str | None, sensors_response.recorded),
sensors=filtered,
)


class AirthingsToken:
"""Representation of an Airthings API token."""

value: Optional[str]
_expires: Optional[int]

def __init__(self):
self.value = None
self._expires = None
value: Optional[str] = None
_expires: Optional[int] = None

def set_token(self, access_token: str, expires_in: int):
"""Set the token and its expiration time."""
Expand All @@ -97,7 +101,11 @@ def set_token(self, access_token: str, expires_in: int):

def is_valid(self) -> bool:
"""Check if the token is valid."""
return self.value and self._expires and self._expires > (int(time.time()) + 20)
return (
self.value is not None
and self._expires is not None
and self._expires > (int(time.time()) + 20)
)

def as_header(self) -> dict[str, str]:
"""Return the token as a header."""
Expand All @@ -107,23 +115,22 @@ def as_header(self) -> dict[str, str]:
class Airthings:
"""Representation of Airthings API data handler."""

_access_token: Optional[str] = None
_client_id: str
_client_secret: str
_unit: GetMultipleSensorsUnit
_access_token: AirthingsToken
devices: List[AirthingsDevice]

_authApiClient: Client
_apiClient: AuthenticatedClient
_auth_api_client: Client
_api_client: AuthenticatedClient

def __init__(
self,
client_id: str,
client_secret: str,
is_metric: bool,
websession: Optional[AsyncClient] = None,
) -> "Airthings":
):
"""Init Airthings data handler."""
self._client_id = client_id
self._client_secret = client_secret
Expand All @@ -133,24 +140,24 @@ def __init__(
else GetMultipleSensorsUnit.IMPERIAL
)
self._access_token = AirthingsToken()
self._devices = {}
self._devices: dict[str, AirthingsDevice] = {}

self._authClient = Client(base_url="https://accounts-api.airthings.com")
self._apiClient = AuthenticatedClient(
self._auth_api_client = Client(base_url=AUTH_URL)
self._api_client = AuthenticatedClient(
base_url=API_URL, token="invalid_token" # Should authenticate before using
)
if websession:
self._authClient.set_async_httpx_client(websession)
self._apiClient.set_async_httpx_client(websession)
self._auth_api_client.set_async_httpx_client(websession)
self._api_client.set_async_httpx_client(websession)

def _verify_auth(self):
"""Authenticate to Airthings API and set the access token and updates internal api clients."""
def verify_auth(self):
"""Make sure the access token is valid. If not, fetch a new one."""

if self._access_token.is_valid():
return

auth_response = self._authClient.get_httpx_client().request(
url=AUTH_URL,
auth_response = self._auth_api_client.get_httpx_client().request(
url=AUTH_URL + "/v1/token",
method="POST",
data={
"grant_type": "client_credentials",
Expand All @@ -164,58 +171,59 @@ def _verify_auth(self):
access_token=access_token,
expires_in=int(auth_response.json()["expires_in"]),
)
print(self._access_token.value)
self._apiClient.token = self._access_token.value
self._api_client.token = self._access_token.value
else:
raise ValueError("No access token found")

def update_devices(self) -> Optional[dict[str, AirthingsDevice]]:
def update_devices(self) -> dict[str, AirthingsDevice]:
"""Update devices and sensors from Airthings API. Return a dict of devices."""
self._verify_auth()
self.verify_auth()

accounts = self.fetch_all_accounts()

if not accounts:
logging.error("No accounts found")
return None
account_ids = self._fetch_all_accounts_ids()

res = {}
logging.info("Accounts found: %s", len(accounts))
for account in accounts:
logging.info("Account: %s", account)
logging.info("Accounts found: %s", len(account_ids))
for account_id in account_ids:
logging.info("Account: %s", account_id)

devices = self.fetch_all_devices(account_id=account.id)
devices = self._fetch_all_devices(account_id=account_id)

logging.info("%s devices found in account %s", len(devices), account.id)
logging.info("%s devices found in account %s", len(devices), account_id)
logging.info("Devices: %s", devices)

sensors = self.fetch_all_device_sensors(
account_id=account.id, unit=self._unit
sensors = self._fetch_all_device_sensors(
account_id=account_id, unit=self._unit
)
if not sensors:
logging.error("No sensors found in account %s", account.id)
logging.error("No sensors found in account %s", account_id)
break
logging.info("%s sensors found in account %s", len(sensors), account.id)
logging.info("%s sensors found in account %s", len(sensors), account_id)
logging.info("Sensors: %s,", sensors)
logging.info("Pages: %s", sensors)

for device in devices:
for sensor in sensors:
if device.serial_number == sensor.serial_number:
res[device.serial_number] = (
AirthingsDevice.init_from_device_response(device, sensor)
)
if device.serial_number != sensor.serial_number:
continue
res[cast(str, device.serial_number)] = (
AirthingsDevice.init_from_device_response(device, sensor)
)

logger.info("Mapped devices: %s", res)
return res

def fetch_all_accounts(self) -> List[AccountResponse]:
def _fetch_all_accounts_ids(self) -> List[str]:
"""Fetch accounts for the given client"""
try:
accounts_response = get_accounts_ids.sync_detailed(client=self._apiClient)
if accounts := accounts_response.parsed:
return accounts.accounts
return []
response = get_accounts_ids.sync_detailed(client=self._api_client).parsed

if response is None:
return []
return [
account.id
for account in (response.accounts or [])
if isinstance(account.id, str)
]
except UnexpectedStatus as e:
logging.error("Unexpected status while fetching accounts: %s", e)
return []
Expand All @@ -226,23 +234,25 @@ def fetch_all_accounts(self) -> List[AccountResponse]:
logging.error("Error while fetching accounts: %s", e)
return []

def fetch_all_devices(self, account_id: str) -> List[DeviceResponse]:
def _fetch_all_devices(self, account_id: str) -> List[DeviceResponse]:
"""Fetch devices for a given account"""
try:
sensors_response = get_devices.sync_detailed(
account_id=account_id,
client=self._apiClient,
)
logger.info("Device headers: %s", sensors_response.headers)
return sensors_response.parsed.devices
account_id=account_id, client=self._api_client
).parsed

if sensors_response is None or isinstance(sensors_response, Unset):
return []

return sensors_response.devices or []
except UnexpectedStatus as e:
logging.error("Unexpected status while fetching devices: %s", e)
return []
except TimeoutException as e:
logging.error("Timeout while fetching devices: %s", e)
return []

def fetch_all_device_sensors(
def _fetch_all_device_sensors(
self,
account_id: str,
page_number: int = 1,
Expand All @@ -252,23 +262,26 @@ def fetch_all_device_sensors(
try:
sensors_response = get_multiple_sensors.sync_detailed(
account_id=account_id,
client=self._apiClient,
client=self._api_client,
page_number=page_number,
unit=unit,
unit=unit or UNSET,
).parsed

if sensors_response is Error or sensors_response is None:
if sensors_response is None or isinstance(sensors_response, Error):
return []

device_sensors = sensors_response.results
device_sensors = sensors_response.results or []

if sensors_response.has_next is False:
return device_sensors

return device_sensors + self.fetch_all_device_sensors(
account_id=account_id,
page_number=page_number + 1,
unit=unit,
return (
self._fetch_all_device_sensors(
account_id=account_id,
page_number=page_number + 1,
unit=unit,
)
+ device_sensors
)
except UnexpectedStatus as e:
logging.error("Unexpected status while fetching sensors: %s", e)
Expand Down

0 comments on commit 2cda6f7

Please sign in to comment.