Skip to content

Commit

Permalink
new sources, new models, dlt
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesphil committed Sep 17, 2024
1 parent 8276e9c commit 4a8785f
Show file tree
Hide file tree
Showing 15 changed files with 899 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ data/duckdb
transform/target/
transform/dbt_packages/
transform/logs/

# dlt
extract/.dlt/secrets.toml
3 changes: 3 additions & 0 deletions extract/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[runtime]
dlthub_telemetry = false
log_level = "INFO"
9 changes: 8 additions & 1 deletion extract/nhl_api.py → extract/legacy/do_not_use_nhl_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""Extracts and loads data from NHL-owned APIs."""
"""Extracts and loads data from NHL-owned APIs.
Please note that this script was created before I used dlt in this project.
dlt has since released their first stable version (1.0.0), making this script
unnecessary.
"""

import logging
import sys
Expand Down
138 changes: 138 additions & 0 deletions extract/nhl_apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Extracts and loads data from NHL-owned APIs."""

import logging
from typing import Generator

import dlt
from dlt.sources.helpers import requests
from dlt.sources.rest_api import (
RESTAPIConfig, # type: ignore[reportPrivateImportUsage]
rest_api_resources,
)

logger = logging.getLogger("dlt")


def main() -> None:
"""."""
logger.info("Starting nhl_api pipeline.")
load_nhl_stats_api()
logger.info("Pipeline completed.")


@dlt.source
def nhl_stats_api() -> Generator:
"""."""

config: RESTAPIConfig = {
"client": {"base_url": "https://api.nhle.com/stats/rest/en/"},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
},
"resources": [
{"name": "teams", "endpoint": {"path": "team"}},
{"name": "games", "endpoint": {"path": "game"}},
{
"name": "skaters",
"primary_key": "playerId",
"endpoint": {
"path": "skater/summary",
"params": {
"cayenneExp": "",
"limit": "-1",
"isAggregate": "true",
"isGame": "true",
},
},
},
{
"name": "goalies",
"primary_key": "playerId",
"endpoint": {
"path": "goalie/summary",
"params": {
"cayenneExp": "",
"limit": "-1",
"isAggregate": "true",
"isGame": "true",
},
},
},
],
}

yield from rest_api_resources(config)


def load_nhl_stats_api() -> None:
"""."""
pipeline = dlt.pipeline(
pipeline_name="nhl_apis",
destination=dlt.destinations.duckdb("../data/sources.duckdb"),
dataset_name="nhl_stats_api",
)

load_info = pipeline.run(nhl_stats_api())
logger.info(load_info)


@dlt.source
def nhl_web_api() -> Generator:
"""."""

config: RESTAPIConfig = {
"client": {"base_url": "https://api-web.nhle.com/v1/"},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
},
"resources": [
{"name": "teams", "endpoint": {"path": "team"}},
{"name": "games", "endpoint": {"path": "game"}},
{
"name": "skaters",
"primary_key": "playerId",
"endpoint": {
"path": "skater/summary",
"params": {
"cayenneExp": "seasonId=20232024",
"limit": "-1",
"isAggregate": "true",
"isGame": "true",
},
},
},
{
"name": "goalies",
"primary_key": "playerId",
"endpoint": {
"path": "goalie/summary",
"params": {
"cayenneExp": "seasonId=20232024",
"limit": "-1",
"isAggregate": "true",
"isGame": "true",
},
},
},
],
}

yield from rest_api_resources(config)


def load_nhl_web_api() -> None:
"""."""
pipeline = dlt.pipeline(
pipeline_name="nhl_apis",
destination=dlt.destinations.duckdb("../data/sources.duckdb"),
dataset_name="nhl_web_api",
)

load_info = pipeline.run(nhl_stats_api())
logger.info(load_info)


if __name__ == "__main__":
main()
160 changes: 160 additions & 0 deletions extract/rest_api_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Any, Optional

import dlt
from dlt.common.pendulum import pendulum
from dlt.sources.rest_api import (
RESTAPIConfig,
check_connection,
rest_api_resources,
rest_api_source,
)


@dlt.source(name="github")
def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any:
# Create a REST API configuration for the GitHub API
# Use RESTAPIConfig to get autocompletion and type checking
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
# we add an auth config if the auth token is present
"auth": (
{
"type": "bearer",
"token": access_token,
}
if access_token
else None
),
},
# The default configuration for all resources and their endpoints
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
# This is a simple resource definition,
# that uses the endpoint path as a resource name:
# "pulls",
# Alternatively, you can define the endpoint as a dictionary
# {
# "name": "pulls", # <- Name of the resource
# "endpoint": "pulls", # <- This is the endpoint path
# }
# Or use a more detailed configuration:
{
"name": "issues",
"endpoint": {
"path": "issues",
# Query parameters for the endpoint
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
# Define `since` as a special parameter
# to incrementally load data from the API.
# This works by getting the updated_at value
# from the previous response data and using this value
# for the `since` query parameter in the next request.
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": pendulum.today()
.subtract(days=30)
.to_iso8601_string(),
},
},
},
},
# The following is an example of a resource that uses
# a parent resource (`issues`) to get the `issue_number`
# and include it in the endpoint path:
{
"name": "issue_comments",
"endpoint": {
# The placeholder {issue_number} will be resolved
# from the parent resource
"path": "issues/{issue_number}/comments",
"params": {
# The value of `issue_number` will be taken
# from the `number` field in the `issues` resource
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
},
# Include data from `id` field of the parent resource
# in the child data. The field name in the child data
# will be called `_issues_id` (_{resource_name}_{field_name})
"include_from_parent": ["id"],
},
],
}

yield from rest_api_resources(config)


def load_github() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api_github",
destination="duckdb",
dataset_name="rest_api_data",
)

load_info = pipeline.run(github_source())
print(load_info) # noqa: T201


def load_pokemon() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api_pokemon",
destination="duckdb",
dataset_name="rest_api_data",
)

pokemon_source = rest_api_source(
{
"client": {
"base_url": "https://pokeapi.co/api/v2/",
# If you leave out the paginator, it will be inferred from the API:
# "paginator": "json_link",
},
"resource_defaults": {
"endpoint": {
"params": {
"limit": 1000,
},
},
},
"resources": [
"pokemon",
"berry",
"location",
],
}
)

def check_network_and_authentication() -> None:
(can_connect, error_msg) = check_connection(
pokemon_source,
"not_existing_endpoint",
)
if not can_connect:
pass # do something with the error message

check_network_and_authentication()

load_info = pipeline.run(pokemon_source)
print(load_info) # noqa: T201


if __name__ == "__main__":
load_github()
load_pokemon()
Loading

0 comments on commit 4a8785f

Please sign in to comment.