Skip to content

Commit

Permalink
added ability to provide schema in config (#14)
Browse files Browse the repository at this point in the history
Co-authored-by: Josh Lloyd <[email protected]>
  • Loading branch information
jlloyd-widen and Josh Lloyd authored Feb 18, 2022
1 parent dccbc08 commit c76d0f1
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 24 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ provided at the top-level will be the default values for each stream.:
Parameters that appear at the stream-level
will overwrite their top-level counterparts except where noted below:
- `name`: required: name of the stream.
- `path`: optional: the path appeneded to the `api_url`.
- `path`: optional: the path appended to the `api_url`.
- `params`: optional: an object of objects that provide the `params` in a `requests.get` method.
Stream level params will be merged with top-level params with stream level params overwriting
top-level params with the same key.
Expand All @@ -104,6 +104,9 @@ will overwrite their top-level counterparts except where noted below:
turned into a json string and processed in that format. This is also automatically done for any lists within the records; therefore,
records are not duplicated for each item in lists.
- `num_inference_keys`: optional: number of records used to infer the stream's schema. Defaults to 50.
- `scheam`: optional: A valid Singer schema or a path-like string that provides
the path to a `.json` file that contains a valid Singer schema. If provided,
the schema will not be inferred from the results of an api call.

## Pagination
Pagination is a complex topic as there is no real single standard, and many different implementations. Unless options are provided, both the request and results stype default to the `default`, which is the pagination style originally implemented.
Expand Down
53 changes: 42 additions & 11 deletions tap_rest_api_msdk/tap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""rest-api tap class."""

import copy
import json
from typing import Any, List

import requests
Expand Down Expand Up @@ -143,6 +144,19 @@ class TapRestApiMsdk(Tap):
"name", th.StringType, required=True, description="name of the stream"
),
)
stream_properties.append(
th.Property(
"schema",
th.CustomType(
{"anyOf": [{"type": "string"}, {"type": "null"}, {"type:": "object"}]}
),
required=False,
description="A valid Singer schema or a path-like string that provides "
"the path to a `.json` file that contains a valid Singer "
"schema. If provided, the schema will not be inferred from "
"the results of an api call.",
),
)

# add streams schema to top-level properties
top_level_properties.append(
Expand Down Expand Up @@ -177,6 +191,33 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore
params = {**self.config.get("params", {}), **stream.get("params", {})}
headers = {**self.config.get("headers", {}), **stream.get("headers", {})}

schema = {}
schema_config = stream.get("schema")
if isinstance(schema_config, str):
self.logger.info("Found path to a schema, not doing discovery.")
with open(schema_config, "r") as f:
schema = json.load(f)

elif isinstance(schema_config, dict):
self.logger.info("Found schema in config, not doing discovery.")
builder = SchemaBuilder()
builder.add_schema(schema_config)
schema = builder.to_schema()

else:
self.logger.info("No schema found. Inferring schema from API call.")
schema = self.get_schema(
records_path,
except_keys,
stream.get(
"num_inference_records",
self.config["num_inference_records"],
),
path,
params,
headers,
)

streams.append(
DynamicStream(
tap=self,
Expand All @@ -196,17 +237,7 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore
pagination_request_style=self.config["pagination_request_style"],
pagination_response_style=self.config["pagination_response_style"],
pagination_page_size=self.config.get("pagination_page_size"),
schema=self.get_schema(
records_path,
except_keys,
stream.get(
"num_inference_records",
self.config["num_inference_records"],
),
path,
params,
headers,
),
schema=schema,
)
)

Expand Down
26 changes: 26 additions & 0 deletions tests/schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
"key1": {
"type": "string"
},
"key2": {
"type": "string"
},
"key3": {
"type": "string"
},
"field1": {
"type": "string"
},
"field2": {
"type": "integer"
}
},
"required": [
"key1",
"key2",
"key3"
]
}
36 changes: 24 additions & 12 deletions tests/test_tap.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import json

from tap_rest_api_msdk.tap import TapRestApiMsdk

from tests.test_streams import config, setup_api

with open("tests/schema.json", "r") as f:
BASIC_SCHEMA = json.load(f)


def test_schema_inference(requests_mock):
setup_api(requests_mock)
Expand All @@ -10,18 +15,25 @@ def test_schema_inference(requests_mock):
0
]

assert stream0.schema == {
"$schema": "http://json-schema.org/schema#",
"required": ["key1", "key2", "key3"],
"type": "object",
"properties": {
"field1": {"type": "string"},
"field2": {"type": "integer"},
"key1": {"type": "string"},
"key2": {"type": "string"},
"key3": {"type": "string"},
},
}
assert stream0.schema == BASIC_SCHEMA


def test_schema_from_file():
configs = config()
configs["streams"][0]["schema"] = "tests/schema.json"

s0 = TapRestApiMsdk(config=configs, parse_env_config=True).discover_streams()[0]

assert s0.schema == BASIC_SCHEMA


def test_schema_from_object():
configs = config()
configs["streams"][0]["schema"] = BASIC_SCHEMA

s0 = TapRestApiMsdk(config=configs, parse_env_config=True).discover_streams()[0]

assert s0.schema == BASIC_SCHEMA


def test_multiple_streams(requests_mock):
Expand Down

0 comments on commit c76d0f1

Please sign in to comment.