Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

add more flow control to to_spectrum #19

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Pandas Extension Changelog

## 0.5.0
* made df an optional parameter in to_spectrum. Lookup can be done by calling out to nes schemas or fall back on df schema lookup.
2 changes: 1 addition & 1 deletion pandas_ext/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Versioning kept here."""
__version__ = '0.4.9'
__version__ = '0.5.0'
__license__ = "MIT"

__title__ = "pandas_ext"
Expand Down
175 changes: 103 additions & 72 deletions pandas_ext/amazon_spectrum.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Tools for creating Amazon Spectrum schemas."""
from inspect import cleandoc
from os import getenv

import pandas as pd
import requests

from pandas_ext.common.utils import today
from pandas_ext.parquet import to_parquet
from pandas_ext.sqla_utils import schema_from_df
from pandas_ext.sqla_utils import schema_from_df, schema_from_registry


def _get_file_format_serde(file_format: str) -> str:
Expand All @@ -18,17 +21,24 @@ def _build_s3_stream_path(
stream,
file_format,
partition,
partition_value
partition_value,
has_partition
):
return (f's3://{bucket}/{stream}/ext={file_format}/'
f'{partition}={partition_value}/{stream}.snappy'
).lower()
partitioned_by = (
f'{partition}={partition_value}'
if has_partition
else ''
)
return cleandoc(f"""
s3://{bucket}/{stream}/ext={file_format}/{partitioned_by}{stream}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an extra {stream} after {partitioned_by} in this fstring?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is for the filename itself

Copy link
Author

@richiverse richiverse Mar 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shows up in both the s3 prefix and the filename itself

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, if this isn't a typo, I'll approve the PR.

/{stream}.snappy"""
).lower()


def _create_schema_alias_statement(
schema_alias: str,
schema: str,
table: str
schema_alias: str,
schema: str,
table: str
) -> str:
return cleandoc(f"""
CREATE VIEW "{schema_alias}"."{table}" AS
Expand All @@ -39,18 +49,21 @@ def _create_schema_alias_statement(


def _create_partition_statement(
schema: str,
bucket: str,
stream: str,
file_format: str='parquet',
partition: str='dt',
partition_value: str=''
schema: str,
bucket: str,
stream: str,
file_format: str= 'parquet',
partition: str= 'dt',
partition_value: str= ''
) -> str:
partition_value = (
partition_value if partition_value else
today()
)
s3_path = f's3://{bucket}/{stream}/ext={file_format}/{partition}={partition_value}/'.lower()
s3_path = (
f's3://{bucket}/{stream}/ext={file_format}/'
f'{partition}={partition_value}/'
).lower()
return cleandoc(f"""
ALTER TABLE "{schema}"."{stream}_{file_format}"
ADD PARTITION ({partition}='{partition_value}')
Expand All @@ -60,8 +73,8 @@ def _create_partition_statement(


def _external_table_exists_statement(
schema: str,
table: str
schema: str,
table: str
) -> str:

return cleandoc(f"""
Expand All @@ -73,45 +86,69 @@ def _external_table_exists_statement(


def _create_external_table_statement(
schema: str,
table: str,
columns: str,
bucket: str,
stream: str,
file_format: str='parquet',
partition: str='dt',
partition_type: str='date',
partition_value: str=''
schema: str,
table: str,
columns: str,
bucket: str,
stream: str,
file_format: str = 'parquet',
partition: str = 'dt',
partition_type: str = 'date',
partition_value: str = '',
has_partition: bool = True
) -> str:
serde = _get_file_format_serde(file_format)
upper_file_format = file_format.upper()
s3_path = f's3://{bucket}/{stream}/ext={file_format}/'.lower()
partitioned_by = (
f'PARTITIONED BY ({partition} {partition_type})'
if has_partition
else ''
)

return cleandoc(f"""
CREATE EXTERNAL TABLE "{schema}"."{table}_{file_format}" (
{columns}
)PARTITIONED BY ({partition} {partition_type})
)
{partitioned_by}
ROW FORMAT SERDE '{serde}'
STORED AS {upper_file_format}
LOCATION '{s3_path}'
;"""
)

def nes_to_spectrum(
s3_prefix: str,
table_name: str,
external_schema: str,
s3_bucket: str,
):
"""Represents daily files that will automatically get a date partition.

Here we:
Infer schema from the schema registry.
Create the schema in our schema registry.
Use the schema registry to create the table, if necessary.
Add partitions as necessary.
Subscribe to adding daily partitions, if necessary.

NOTE: This function is not ready for public consumption at this time.
"""

def to_spectrum(
df: pd.DataFrame,
table: str,
schema: str,
bucket: str,
schema_alias: str='',
stream: str='',
file_format: str='parquet',
partition: str='dt',
partition_type: str='date',
partition_value: str='',
conn: str='',
verbose: bool=True,
**kwargs
df: pd.DataFrame,
table: str,
external_schema: str,
bucket: str,
schema_alias: str = '',
s3_prefix: str = '',
file_format: str = 'parquet',
partition: str = 'dt',
partition_type: str = 'date',
partition_value: str = '',
conn: str = '',
has_partition: bool = True,
**kwargs
) -> str:
"""Sends your dataframe to Spectrum for use in Athena/Redshift/Looker/etc

Expand All @@ -120,71 +157,65 @@ def to_spectrum(

df: pandas Dataframe
table: table name as it appears in Spectrum
schema: external table schema
external_schema: external table schema
bucket: s3 bucket
schema_alias: If you want to create an alternate path to your schema
stream: Defaults to table if not provided.
s3_prefix: Defaults to table if not provided.
file_format: Defaults to parquet and may expand to avro.
partition: Defaults to dt, which is short for the date.
partition_type: The data type declaration of the partition value.
partition_value: Defaults to todays date.
conn: A valid sqlalchemy string to connect to spectrum.
kwargs: kwargs you want to pass to `to_parquet()` call
has_partition: Remove partition altogether from output
"""

s3_prefix = s3_prefix if s3_prefix else table
columns = schema_from_df(df)
stream = stream if stream else table

external_table_statement = _create_external_table_statement(
schema=schema,
schema=external_schema,
table=table,
columns=columns,
bucket=bucket,
stream=stream,
stream=s3_prefix,
file_format=file_format,
partition=partition,
partition_type=partition_type
partition_type=partition_type,
has_partition=has_partition
)
alias_statement = (
'' if not schema_alias else
_create_alias_statement(schema_alias, schema, table)
)
partition_value = (
partition_value if partition_value else
today()
)
partition_statement = _create_partition_statement(
schema=schema,
bucket=bucket,
stream=stream,
partition=partition,
partition_value=partition_value
_create_schema_alias_statement(schema_alias, external_schema, table)
)
if has_partition:
partition_value = (
partition_value if partition_value else
today()
)
partition_statement = _create_partition_statement(
schema=external_schema,
bucket=bucket,
stream=s3_prefix,
partition=partition,
partition_value=partition_value
)
else:
partition_statement = ''
create_statement = (''.join([
external_table_statement,
alias_statement,
partition_statement
]))
print(create_statement)

s3_path = _build_s3_stream_path(
bucket,
stream,
file_format,
partition,
partition_value)
if verbose:
print(f'SELECT COUNT(*) FROM "{schema}"."{table}_{file_format}";')
print(f"df_{table} = read_parquet('{s3_path}')")

if conn:
to_parquet(df, s3_path, **kwargs)
from sqlalchemy import create_engine
engine = create_engine(conn, execution_options=dict(autocommit=True))

schema_table_statement = _external_table_exists_statement(
schema, table)
external_schema, table)
table_exists_data = pd.read_sql_query(schema_table_statement, conn)
if not len(table_exists_data):
if len(table_exists_data) == 0:
# table doesn't exist so create it.
engine.execute(create_statement)
engine.execute(partition_statement)
Expand Down
34 changes: 29 additions & 5 deletions pandas_ext/sqla_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,23 @@
DOUBLE PRECISION FLOAT8, FLOAT Double precision floating-point number
BOOLEAN BOOL Logical Boolean (true/false)
CHAR CHARACTER, NCHAR, BPCHAR Fixed-length character string
VARCHAR CHARACTER VARYING, NVARCHAR, TEXT Variable-length character string with a user-defined limit
VARCHAR CHARACTER VARYING, NVARCHAR, TEXT Variable-length character string
with a user-defined limit
DATE Calendar date (year, month, day)
TIMESTAMP TIMESTAMP WITHOUT TIME ZONE Date and time (without time zone)
TIMESTAMPTZ TIMESTAMP WITH TIME ZONE Date and time (with time zone)
TIMESTAMPTZ TIMESTAMP WITH TIME ZONE Date and time (with time zone)
"""
from os import getenv

import numpy as np
import pandas as pd
import requests

from pandas.api.types import pandas_dtype
import numpy as np


def dtype_to_spectrum(dtype):
"""convert pandas dtype to equivalent redshift spectrum schema column value."""
"""convert pandas dtype to equivalent redshift spectrum schema column."""
try:
return {
pandas_dtype(np.float64): 'FLOAT8',
Expand All @@ -64,10 +69,29 @@ def dtype_to_spectrum(dtype):
return 'TEXT'


def schema_from_df(df: pd.DataFrame):
def schema_from_df(df: pd.DataFrame) -> str:
"""Get schema from a pandas DataFrame"""
dtype_map = df.dtypes.to_dict()
return ',\n'.join(
[f'"{col}" {dtype_to_spectrum(dtype)}'
for col, dtype in dtype_map.items()
]
)


def schema_from_registry(stream: str) -> str:
"""Using NES schemas endpoint, pull the latest schema from the registry.

NES schemas repo coming soon!
"""
endpoint = getenv('NES_SCHEMAS_ENDPOINT')
if not endpoint:
raise ValueError('NES_SCHEMAS_ENDPOINT required in env vars.')

key = getenv('NES_SCHEMAS_KEY')
url = f'{endpoint}/schema/{stream}/version/latest'
response = requests.get(url, headers={'x-api-key': key}).json()
parsed = response[0]['columns']
return ',\n'.join(
[f'''"{dct['name']}" {dct['type']}''' for dct in parsed]
)
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ license-file = LICENSE

[bdist_wheel]
universal=1

[mypy]
ignore_missing_imports = True
Loading