Skip to content

Commit

Permalink
Merge pull request #14 from enerBit/pydantic-settings
Browse files Browse the repository at this point in the history
Pydantic settings
Make mypy happy
  • Loading branch information
elpablete authored Nov 9, 2024
2 parents a98ee90 + d278741 commit c4f2e27
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 118 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ dependencies = [
"orjson>=3.10.10",
"pydantic>=2.9.2",
"rich>=13.9.4",
"urlpath>=1.2.0",
"typer>=0.12.5",
"tzdata>=2024.2",
"truststore>=0.10.0",
"pydantic-settings>=2.6.1",
]

[project.scripts]
Expand Down
2 changes: 1 addition & 1 deletion src/enerbitdso/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.15
0.1.16
28 changes: 24 additions & 4 deletions src/enerbitdso/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import operator
import pathlib
import sys
from typing import Annotated, TypedDict

import pydantic
import typer
import zoneinfo
from rich.console import Console
Expand All @@ -17,7 +19,22 @@
logger = logging.getLogger(__name__)

DATE_FORMATS = ["%Y-%m-%d", "%Y%m%d"]
DATE_PARTS_TO_START_DAY = {"hour": 0, "minute": 0, "second": 0, "microsecond": 0}


class DateParts(TypedDict, total=False):
hour: int
minute: int
second: int
microsecond: int


DATE_PARTS_TO_START_DAY: DateParts = {
"hour": 0,
"minute": 0,
"second": 0,
"microsecond": 0,
}

TZ_INFO = zoneinfo.ZoneInfo("America/Bogota")


Expand All @@ -41,9 +58,12 @@ def today():

@usages.command()
def fetch(
api_base_url: str = typer.Option(..., envvar="ENERBIT_API_BASE_URL"),
api_username: str = typer.Option(..., envvar="ENERBIT_API_USERNAME"),
api_password: str = typer.Option(..., envvar="ENERBIT_API_PASSWORD"),
api_base_url: Annotated[str, typer.Option(..., envvar="ENERBIT_API_BASE_URL")],
api_username: Annotated[str, typer.Option(..., envvar="ENERBIT_API_USERNAME")],
api_password: Annotated[
pydantic.SecretStr,
typer.Option(parser=pydantic.SecretStr, envvar="ENERBIT_API_PASSWORD"),
],
since: dt.datetime = typer.Option(
yesterday,
formats=DATE_FORMATS,
Expand Down
12 changes: 8 additions & 4 deletions src/enerbitdso/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import pydantic
import pydantic_settings


class Config(pydantic.BaseSettings):
DAY_FILENAME_TEMPLATE: str = "STAR_{operation_day:%Y%m%d}_{serial}_Day_{now:%Y%m%d%H%M%S}.xml"
INTERVAL_FILENAME_TEMPLATE: str = "STAR_{operation_day:%Y%m%d}_{serial}_Interval_{now:%Y%m%d%H%M%S}.xml"
class Config(pydantic_settings.BaseSettings):
DAY_FILENAME_TEMPLATE: str = (
"STAR_{operation_day:%Y%m%d}_{serial}_Day_{now:%Y%m%d%H%M%S}.xml"
)
INTERVAL_FILENAME_TEMPLATE: str = (
"STAR_{operation_day:%Y%m%d}_{serial}_Interval_{now:%Y%m%d%H%M%S}.xml"
)


settings = Config()
25 changes: 16 additions & 9 deletions src/enerbitdso/enerbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import logging
import math
import ssl
import urllib
import urllib.parse

import httpx
import pydantic
import truststore
import urlpath

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,8 +57,11 @@ def get_auth_token(base_url, username, password):
return token


def get_client(base_url, username, password):
url = str(urlpath.URL(base_url))
def get_client(
base_url: str, username: str, password: pydantic.SecretStr
) -> httpx.Client:
url_parse: urllib.parse.ParseResult = urllib.parse.urlparse(base_url)
url = url_parse.geturl()
token = get_auth_token(url, username, password)
auth = {"Authorization": f"Bearer {token}"}
return httpx.Client(base_url=url, headers=auth, timeout=TIMEOUT, verify=SSL_CONTEXT)
Expand Down Expand Up @@ -86,13 +90,16 @@ def get_schedule_usage_records(
) -> list[ScheduleUsageRecord]:
path = "/measurements/schedules/usages"
params = {
"since": since,
"until": until,
"since": since.isoformat(),
"until": until.isoformat(),
"frt-code": frt_code,
"period-string": "hour",
"period-number": 1,
"period-number": "1",
}
response = client.get(path, params=params)
response = client.get(
path,
params=params,
)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
Expand All @@ -112,8 +119,8 @@ def get_schedule_measurement_records(
) -> list[ScheduleMeasurementRecord]:
path = "/measurements/schedules/"
params = {
"since": since,
"until": until,
"since": since.isoformat(),
"until": until.isoformat(),
"frt-code": frt_code,
}
response = client.get(path, params=params)
Expand Down
14 changes: 7 additions & 7 deletions src/enerbitdso/formats.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
import io
import typing

import orjson
import pydantic
Expand All @@ -8,23 +9,22 @@
def as_json(records: list[pydantic.BaseModel]) -> io.StringIO:
content = orjson.dumps([r.model_dump() for r in records])
res = io.BytesIO(content)
wrapper = io.TextIOWrapper(res, encoding="utf-8")
return wrapper

return io.StringIO(res.getvalue().decode("utf-8"))

def as_csv(records: list[pydantic.BaseModel], header: bool) -> io.StringIO:

def as_csv(records: typing.Sequence[pydantic.BaseModel], header: bool) -> io.StringIO:
res = io.StringIO(newline="")
fields = records[0].model_fields.keys()
content_lines = [r.model_dump() for r in records]
writer = csv.DictWriter(res, fields, lineterminator="\n")
if header:
writer.writeheader()
for i in content_lines:
writer.writerow(i)
for i in records:
writer.writerow(i.model_dump())
return res


def as_jsonl(records: list[pydantic.BaseModel]) -> io.StringIO:
def as_jsonl(records: typing.Sequence[pydantic.BaseModel]) -> io.StringIO:
res = io.StringIO()
for i in records:
res.write(i.model_dump_json())
Expand Down
66 changes: 36 additions & 30 deletions tests/mocked_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def create_mocked_schedules(
frt_code: str, since: dt.datetime, until: dt.datetime
) -> None:
mocked_schedules.clear()
dt_range = pd.core.indexes.datetimes.DatetimeIndex = (
dt_range: pd.core.indexes.datetimes.DatetimeIndex = (
pd.core.indexes.datetimes.date_range(
since,
until,
Expand All @@ -37,34 +37,38 @@ def create_mocked_schedules(
reactive_energy_imported += round(random.randint(0, 100))
reactive_energy_exported += round(random.randint(0, 100))
mocked_usages.append(
{
"frt_code": str(frt_code),
"meter_serial": str(meter_serial),
"time_local_utc": item["start"],
"voltage_multiplier": 1,
"current_multiplier": 1,
"active_energy_imported": active_energy_imported,
"active_energy_exported": active_energy_exported,
"reactive_energy_imported": reactive_energy_imported,
"reactive_energy_exported": reactive_energy_exported,
}
ScheduleUsageRecord.model_validate(
{
"frt_code": str(frt_code),
"meter_serial": str(meter_serial),
"time_local_utc": item["start"],
"voltage_multiplier": 1,
"current_multiplier": 1,
"active_energy_imported": active_energy_imported,
"active_energy_exported": active_energy_exported,
"reactive_energy_imported": reactive_energy_imported,
"reactive_energy_exported": reactive_energy_exported,
}
)
)


def get_mocked_schedules(ebclient, frt_code, since, until) -> list[ScheduleUsageRecord]:
def get_mocked_schedules(
ebclient, frt_code, since, until
) -> list[ScheduleMeasurementRecord]:
filtered_mocked_usages = [
schedules
for schedules in mocked_schedules
if schedules["time_local_utc"] >= since
and schedules["time_local_utc"] <= until
and schedules["frt_code"] == frt_code
if schedules.time_local_utc >= since
and schedules.time_local_utc <= until
and schedules.frt_code == frt_code
]
return filtered_mocked_usages


def create_mocked_usages(frt_code: str, since: dt.datetime, until: dt.datetime) -> None:
mocked_usages.clear()
dt_range = pd.core.indexes.datetimes.DatetimeIndex = (
dt_range: pd.core.indexes.datetimes.DatetimeIndex = (
pd.core.indexes.datetimes.date_range(
since,
until - dt.timedelta(hours=1),
Expand All @@ -78,25 +82,27 @@ def create_mocked_usages(frt_code: str, since: dt.datetime, until: dt.datetime)
meter_serial = "".join(random.choice(letters) for i in range(10))
for index, item in enumerate(list_interval):
mocked_usages.append(
{
"frt_code": str(frt_code),
"meter_serial": str(meter_serial),
"time_start": item["start"],
"time_end": item["start"] + dt.timedelta(hours=1),
"active_energy_imported": round(random.uniform(0, 3), 2),
"active_energy_exported": round(random.uniform(0, 3), 2),
"reactive_energy_imported": round(random.uniform(0, 3), 2),
"reactive_energy_exported": round(random.uniform(0, 3), 2),
}
ScheduleUsageRecord.model_validate(
{
"frt_code": str(frt_code),
"meter_serial": str(meter_serial),
"time_start": item["start"],
"time_end": item["start"] + dt.timedelta(hours=1),
"active_energy_imported": round(random.uniform(0, 3), 2),
"active_energy_exported": round(random.uniform(0, 3), 2),
"reactive_energy_imported": round(random.uniform(0, 3), 2),
"reactive_energy_exported": round(random.uniform(0, 3), 2),
}
)
)


def get_mocked_usages(ebclient, frt_code, since, until) -> list[ScheduleUsageRecord]:
filtered_mocked_usages = [
usages
for usages in mocked_usages
if usages["time_start"] >= since
and usages["time_end"] <= until
and usages["frt_code"] == frt_code
if usages.time_start >= since
and usages.time_end <= until
and usages.frt_code == frt_code
]
return filtered_mocked_usages
Loading

0 comments on commit c4f2e27

Please sign in to comment.