Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metadata object #182

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Note: This tap currently does not support incremental state.
| files | False | None | An array of csv file stream settings. |
| csv_files_definition| False | None | A path to the JSON file holding an array of file settings. |
| add_metadata_columns| False | False | When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to output. |
| add_metadata_dict| False | False | When True, adds the metadata object (`source`, `time_extracted`) to output. |

A full list of supported settings and capabilities is available by running: `tap-csv --about`

Expand Down
20 changes: 18 additions & 2 deletions tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import csv
import os
from datetime import datetime, timezone
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Dict

from singer_sdk import typing as th
from singer_sdk.streams import Stream

SDC_SOURCE_FILE_COLUMN = "_sdc_source_file"
SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno"
SDC_SOURCE_FILE_MTIME_COLUMN = "_sdc_source_file_mtime"

METADATA_COLUMN = "metadata"

class CSVStream(Stream):
"""Stream class for CSV streams."""
Expand Down Expand Up @@ -48,6 +48,10 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]:
if self.config.get("add_metadata_columns", False):
row = [file_path, file_last_modified, file_lineno] + row

if self.config.get("add_metadata_dict", False):
metadata_dict={"source": file_path, "time_extracted": datetime.utcnow()}
row = [metadata_dict] + row

yield dict(zip(self.header, row))

def _get_recursive_file_paths(self, file_path: str) -> list:
Expand Down Expand Up @@ -152,7 +156,19 @@ def schema(self) -> dict:
th.Property(SDC_SOURCE_FILE_MTIME_COLUMN, th.DateTimeType)
)
properties.append(th.Property(SDC_SOURCE_LINENO_COLUMN, th.IntegerType))

# If enabled, add file's metadata to output
if self.config.get("add_metadata_dict", False):
header = [
METADATA_COLUMN,
] + header

t = th.ObjectType(
th.Property("source", th.StringType),
th.Property("time_extracted", th.StringType),
additional_properties=False,
)
properties.append(th.Property(METADATA_COLUMN, t))
# Cache header for future use
self.header = header

Expand Down
9 changes: 9 additions & 0 deletions tap_csv/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class TapCSV(Tap):
"`_sdc_source_file_mtime`, `_sdc_source_lineno`) to output."
),
),
th.Property(
"add_metadata_dict",
th.BooleanType,
required=False,
default=False,
description=(
"When True, adds basic metadata as dict"
),
),
).to_dict()

@classproperty
Expand Down