-
Notifications
You must be signed in to change notification settings - Fork 0
/
conftest.py
52 lines (36 loc) · 1.39 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import json
import os
import pytest
from tap_krow.tap import TapKrow
class FakeResponse(object):
def __init__(self, response_body: str):
self.response_body = response_body
def json(self):
return json.loads(self.response_body)
@pytest.fixture(scope="session", autouse=True)
def tap_instance():
SAMPLE_CONFIG = {"api_key": "testing"}
return TapKrow(SAMPLE_CONFIG)
@pytest.fixture(scope="session")
def stream_factory(tap_instance):
def get_new_stream(stream_class):
return stream_class(tap_instance)
return get_new_stream
@pytest.fixture(scope="session", autouse=True)
def api_responses():
responses = {}
for root, dirs, _files in os.walk("tap_krow/tests/api_responses"):
for d in dirs:
responses[d] = {}
response_files = [f for f in os.listdir(os.path.join(root, d)) if f.endswith(".json")]
for file in response_files:
with open(os.path.join(root, d, file)) as f:
responses[d][file] = FakeResponse(f.read())
return responses
@pytest.fixture
def get_parsed_records():
"""This fixture returns a function so you can pass in an API response body
as a string and get back a parsed response object, parsed by the Stream"""
def response_parser(stream_instance, response):
return list(stream_instance.parse_response(response))
return response_parser