diff --git a/core/setup.py b/core/setup.py index 296ea562..5208d919 100644 --- a/core/setup.py +++ b/core/setup.py @@ -151,6 +151,7 @@ def get_long_description(package_dir): } META_FILE = read(META_PATH) INSTALL_REQUIRES = [ + "apache-beam[gcp]", "click", "glom", "google-api-python-client", diff --git a/core/src/klio_core/config/_io.py b/core/src/klio_core/config/_io.py index a964b2b4..302e5286 100644 --- a/core/src/klio_core/config/_io.py +++ b/core/src/klio_core/config/_io.py @@ -18,8 +18,9 @@ import json import logging - import attr +from apache_beam.io.filesystem import CompressionTypes + logger = logging.getLogger("klio") @@ -255,6 +256,7 @@ def from_dict(cls, config_dict, *args, **kwargs): class KlioFileConfig(object): name = "file" + compression_type = attr.attrib(type=str, default=CompressionTypes.AUTO) @attr.attrs(frozen=True) @@ -286,6 +288,11 @@ def as_dict(self): @attr.attrs(frozen=True) class KlioWriteFileConfig(KlioEventOutput, KlioFileConfig): file_path_prefix = attr.attrib(type=str) + file_name_suffix = attr.attrib(type=str, default="") + append_trailing_newlines = attr.attrib(type=bool, default=True) + num_shards = attr.attrib(type=int, default=0) + shard_name_template = attr.attrib(type=str, default=None) + header = attr.attrib(type=str, default=None) @classmethod def from_dict(cls, config_dict, *args, **kwargs): diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index 6d300185..b254f96b 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -432,17 +432,33 @@ def test_klio_read_file_config(): assert config_dict["location"] == klio_read_file_config.file_pattern -def test_klio_write_file_config(): - config_dict = { - "type": "GCS", - "location": "gs://sigint-output/test-parent-job-out", - } +@pytest.mark.parametrize( + "config_dict", + ( + # The minimum inputs + {"type": "GCS", "location": "gs://sigint-output/test-parent-job-out"}, + # Other fields configured + { + "type": "GCS", + "location": "gs://sigint-output/test-parent-job-out", + "file_name_suffix": ".txt", + "num_shards": 3, + "append_trailing_newlines": True, + }, + ), +) +def test_klio_write_file_config(config_dict): klio_write_file_config = io.KlioWriteFileConfig.from_dict( config_dict, io.KlioIOType.DATA, io.KlioIODirection.OUTPUT ) + config_dict.pop("type") assert "file" == klio_write_file_config.name - assert config_dict["location"] == klio_write_file_config.file_path_prefix + for k, v in config_dict.items(): + if k == "location": + assert v == klio_write_file_config.file_path_prefix + else: + assert v == getattr(klio_write_file_config, k) def test_klio_write_bigquery_config(): diff --git a/core/tox.ini b/core/tox.ini index 99cf367b..5c436f26 100644 --- a/core/tox.ini +++ b/core/tox.ini @@ -1,9 +1,15 @@ [tox] -envlist = py36,py37,py38,docs,manifest,check-formatting,lint +envlist = {py36,py37,py38}-beam{26,27,28},docs,manifest,check-formatting,lint [testenv] install_command = python -m pip install {opts} {packages} extras = tests +deps = + {toxinidir}/../core + beam26: apache-beam[gcp]>=2.26.0,<2.27.0 + beam27: apache-beam[gcp]>=2.27.0,<2.28.0 + beam28: apache-beam[gcp]>=2.28.0,<2.29.0 + commands = coverage run -m pytest {posargs} ; assert that protos are compiled @@ -85,6 +91,8 @@ filterwarnings = ignore:invalid escape sequence:DeprecationWarning ; 3rd party libraries haven't updated their use of collections.abc (py37+) ignore:Using or importing the ABCs from:DeprecationWarning + ; Apache Beam-related warnings + ignore:Running the Apache Beam SDK on Python 3:UserWarning ; required for mapping envs -> github runtimes [gh-actions]