diff --git a/ack/readers/amazon_s3/cli.py b/ack/readers/amazon_s3/cli.py index 442b506..3a5ea53 100644 --- a/ack/readers/amazon_s3/cli.py +++ b/ack/readers/amazon_s3/cli.py @@ -17,6 +17,7 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import click + from ack.readers.amazon_s3.reader import AmazonS3Reader from ack.utils.args import extract_args from ack.utils.processor import processor @@ -26,6 +27,9 @@ @click.option("--s3-bucket", required=True) @click.option("--s3-prefix", required=True, multiple=True) @click.option("--s3-format", required=True, type=click.Choice(["csv", "gz", "njson"])) +@click.option("--s3-region-name", required=True) +@click.option("--s3-access-key-id", required=True) +@click.option("--s3-secret-access-key", required=True) @click.option("--s3-dest-key-split", default=-1, type=int) @click.option("--s3-csv-delimiter", default=",") @click.option("--s3-csv-fieldnames", default=None) diff --git a/ack/readers/amazon_s3/config.py b/ack/readers/amazon_s3/config.py index ab698ea..4cd29c6 100644 --- a/ack/readers/amazon_s3/config.py +++ b/ack/readers/amazon_s3/config.py @@ -13,3 +13,6 @@ class AmazonS3ReaderConfig(BaseModel): dest_key_split: int = 1 csv_delimiter: str = "," csv_fieldnames: str = None + region_name: str + access_key_id: str + secret_access_key: str diff --git a/ack/readers/amazon_s3/reader.py b/ack/readers/amazon_s3/reader.py index c89ac8a..0c95c61 100644 --- a/ack/readers/amazon_s3/reader.py +++ b/ack/readers/amazon_s3/reader.py @@ -21,14 +21,17 @@ class AmazonS3Reader(ObjectStorageReader): - def __init__(self, bucket, prefix, format, dest_key_split=-1, **kwargs): + def __init__(self, bucket, prefix, format, region_name, access_key_id, secret_access_key, dest_key_split=-1, **kwargs): + self._access_key_id = access_key_id + self._secret_access_key = secret_access_key + self._region_name = region_name super().__init__(bucket, prefix, format, dest_key_split, platform="S3", **kwargs) - def create_client(self, config): + def create_client(self): boto_config = { - "region_name": config.REGION_NAME, - "aws_access_key_id": config.AWS_ACCESS_KEY_ID, - "aws_secret_access_key": config.AWS_SECRET_ACCESS_KEY, + "region_name": self._region_name, + "aws_access_key_id": self._access_key_id, + "aws_secret_access_key": self._secret_access_key, } return boto3.resource("s3", **boto_config) diff --git a/ack/readers/google_cloud_storage/cli.py b/ack/readers/google_cloud_storage/cli.py index 22da833..8359ce2 100644 --- a/ack/readers/google_cloud_storage/cli.py +++ b/ack/readers/google_cloud_storage/cli.py @@ -17,6 +17,7 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import click + from ack.readers.google_cloud_storage.reader import GoogleCloudStorageReader from ack.utils.args import extract_args from ack.utils.processor import processor @@ -29,6 +30,7 @@ @click.option("--gcs-dest-key-split", default=-1, type=int) @click.option("--gcs-csv-delimiter", default=",") @click.option("--gcs-csv-fieldnames", default=None) +@click.option("--gcs-project-id", required=True) @processor() def google_cloud_storage(**kwargs): return GoogleCloudStorageReader(**extract_args("gcs_", kwargs)) diff --git a/ack/readers/google_cloud_storage/config.py b/ack/readers/google_cloud_storage/config.py index b423eed..edbcfae 100644 --- a/ack/readers/google_cloud_storage/config.py +++ b/ack/readers/google_cloud_storage/config.py @@ -10,6 +10,7 @@ class GoogleCloudStorageReaderConfig(BaseModel): bucket: str prefix: List[str] format: Literal[FORMATS] + project_id: str dest_key_split: int = -1 csv_delimiter: str = "," fieldnames: str = None diff --git a/ack/readers/google_cloud_storage/reader.py b/ack/readers/google_cloud_storage/reader.py index 837600a..3bbc1f7 100644 --- a/ack/readers/google_cloud_storage/reader.py +++ b/ack/readers/google_cloud_storage/reader.py @@ -24,11 +24,12 @@ class GoogleCloudStorageReader(ObjectStorageReader, GoogleClient): - def __init__(self, bucket, prefix, format, dest_key_split=-1, **kwargs): + def __init__(self, bucket, prefix, format, project_id, dest_key_split=-1, **kwargs): + self._project_id = project_id super().__init__(bucket, prefix, format, dest_key_split, platform="GCS", **kwargs) - def create_client(self, config): - return storage.Client(credentials=self._get_credentials(), project=config.project_id) + def create_client(self): + return storage.Client(credentials=self._get_credentials(), project=self._project_id) def create_bucket(self, client, bucket): return client.bucket(bucket) diff --git a/ack/readers/object_storage/reader.py b/ack/readers/object_storage/reader.py index 71c4e42..2c9e35f 100644 --- a/ack/readers/object_storage/reader.py +++ b/ack/readers/object_storage/reader.py @@ -18,7 +18,6 @@ import tempfile -from ack import config from ack.config import logger from ack.readers.reader import Reader from ack.streams.json_stream import JSONStream @@ -27,7 +26,7 @@ class ObjectStorageReader(Reader): def __init__(self, bucket, prefix, file_format, dest_key_split, platform=None, **kwargs): - self._client = self.create_client(config) + self._client = self.create_client() self._bucket = self.create_bucket(self._client, bucket) self._prefix_list = prefix self._platform = platform @@ -70,7 +69,7 @@ def _result_generator(self, _object): def is_compatible_object(self, _object): return self.get_key(_object).endswith("." + self._format) - def create_client(self, config): + def create_client(self): raise NotImplementedError def create_bucket(self, client, bucket): diff --git a/ack/writers/google_bigquery/cli.py b/ack/writers/google_bigquery/cli.py index 5d83b35..cb8ab26 100644 --- a/ack/writers/google_bigquery/cli.py +++ b/ack/writers/google_bigquery/cli.py @@ -24,6 +24,7 @@ @click.command(name="write_bq") @click.option("--bq-dataset", required=True) +@click.option("--bq-project-id", required=True) @click.option("--bq-table", required=True) @click.option("--bq-bucket", required=True) @click.option("--bq-partition-column") diff --git a/ack/writers/google_bigquery/config.py b/ack/writers/google_bigquery/config.py index 761548e..249b200 100644 --- a/ack/writers/google_bigquery/config.py +++ b/ack/writers/google_bigquery/config.py @@ -9,6 +9,7 @@ class GoogleBigQueryWriterConfig(BaseModel): dataset: str + project_id: str table: str bucket: str partition_column: str = None diff --git a/ack/writers/google_bigquery/writer.py b/ack/writers/google_bigquery/writer.py index 76db122..399e937 100644 --- a/ack/writers/google_bigquery/writer.py +++ b/ack/writers/google_bigquery/writer.py @@ -17,7 +17,6 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from google.cloud import bigquery -from ack import config from ack.config import logger from ack.clients.google.client import GoogleClient from ack.streams.normalized_json_stream import NormalizedJSONStream @@ -30,10 +29,10 @@ class GoogleBigQueryWriter(Writer, GoogleClient): _client = None def __init__( - self, dataset, table, bucket, partition_column, write_disposition, location, keep_files, + self, dataset, project_id, table, bucket, partition_column, write_disposition, location, keep_files, ): - self._project_id = config.PROJECT_ID + self._project_id = project_id self._client = bigquery.Client(credentials=self._get_credentials(), project=self._project_id) self._dataset = dataset self._table = table diff --git a/ack/writers/google_cloud_storage/cli.py b/ack/writers/google_cloud_storage/cli.py index 30362b4..0bd4aae 100644 --- a/ack/writers/google_cloud_storage/cli.py +++ b/ack/writers/google_cloud_storage/cli.py @@ -25,7 +25,7 @@ @click.command(name="write_gcs") @click.option("--gcs-bucket", help="GCS Bucket", required=True) @click.option("--gcs-prefix", help="GCS path to write the file.") -@click.option("--gcs-project-id", help="GCS Project Id") +@click.option("--gcs-project-id", help="GCS Project Id", required=True) @click.option( "--gcs-filename", help="Override the default name of the file (don't add the extension)", ) diff --git a/ack/writers/google_cloud_storage/writer.py b/ack/writers/google_cloud_storage/writer.py index c4c8082..de9af59 100644 --- a/ack/writers/google_cloud_storage/writer.py +++ b/ack/writers/google_cloud_storage/writer.py @@ -16,16 +16,14 @@ # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -import click from google.cloud import storage -from ack import config from ack.clients.google.client import GoogleClient from ack.writers.object_storage.writer import ObjectStorageWriter class GoogleCloudStorageWriter(ObjectStorageWriter, GoogleClient): def __init__(self, bucket, project_id, prefix=None, filename=None, **kwargs): - self._project_id = self.get_project_id(project_id) + self._project_id = project_id super().__init__(bucket, prefix, filename, platform="GCS", **kwargs) def _create_client(self): @@ -43,14 +41,3 @@ def _create_blob(self, file_name, stream): def _get_uri(self, file_name): return f"gs{self._get_file_path(file_name)}" - - @staticmethod - def get_project_id(project_id): - if project_id is None: - try: - return config.PROJECT_ID - except Exception: - raise click.exceptions.MissingParameter( - "Please provide a project id in ENV var or params.", param_type="--gcs-project-id", - ) - return project_id diff --git a/docs/source/readers.rst b/docs/source/readers.rst index 336ae8b..2b8e0a7 100644 --- a/docs/source/readers.rst +++ b/docs/source/readers.rst @@ -158,15 +158,7 @@ Source API Quickstart ---------- -Execute the following commands to set your credentials: - -.. code-block:: shell - - export REGION_NAME= - export AWS_ACCESS_KEY_ID= - export AWS_SECRET_ACCESS_KEY= - -Once done, launch your S3 reader command. The following command retrieves the blobs located under the Amazon S3 bucket ``daily_reports`` and the blob prefix ``FR/offline_sales/``. +The following command retrieves the blobs located under the Amazon S3 bucket ``daily_reports`` and the blob prefix ``FR/offline_sales/``. .. code-block:: shell @@ -192,6 +184,9 @@ CMD Options JSON Options Definition ``--s3-bucket`` ``bucket`` S3 bucket name ``--s3-prefix`` ``prefix`` (list) S3 blob prefix. Several prefixes can be provided in a single command. ``--s3-format`` ``format`` S3 blob format. Possible values: csv, gz. +``--s3-region-name`` ``region_name`` Name of the bucket's region +``--s3-access-key-id`` ``access_key_id`` Access key Id for AWS +``--s3-secret-access-key`` ``secret_access_key`` Secret access key for AWS ``--s3-dest-key-split`` ``dest_key_split`` Indicates how to retrieve a blob name from a blob key (a blob key being the combination of a blob prefix and a blob name: /). The reader splits the blob key on the "/" character: the last element of the output list is considered as the blob name, and is used to name the stream produced by the reader. This option defines how many splits to do. Default: -1 (split on all occurences). ``--s3-csv-delimiter`` ``csv_delimiter`` Delimiter that should be used to read the .csv file. Default: , ``--s3-csv-fieldnames`` ``fieldnames`` List of field names. If set to None (default), the values in the first row of .csv file will be used as field names. @@ -593,11 +588,10 @@ Follow these steps to set your credentials: - In your GCP project, create a Service Account with a 'Storage Object Viewer' role - Create a .JSON key for this Service Account, and download the key file locally -- Execute the following commands: +- Execute the following command: .. code-block:: shell - export project_id= export GCP_KEY_PATH= Once done, launch your Google Cloud Storage reader command. The following command retrieves the blobs located under the Google Cloud Storage bucket ``daily_reports`` and the blob prefix ``FR/offline_sales/``: @@ -626,6 +620,7 @@ CMD Options JSON Options Definition ``--gcs-bucket`` ``bucket`` Cloud Storage bucket name ``--gcs-prefix`` ``prefix`` (list) Cloud Storage blob prefix. Several prefixes can be provided in a single command. ``--gcs-format`` ``format`` Cloud Storage blob format. *Possible values: csv, gz* +``--gcs-project-id`` ``project_id`` GCP project ID ``--gcs-dest-key-split`` ``dest_key-split`` Indicates how to retrieve a blob name from a blob key (a blob key being the combination of a blob prefix and a blob name: /). The reader splits the blob key on the "/" character: the last element of the output list is considered as the blob name, and is used to name the stream produced by the reader. This option defines how many splits to do. *Default: -1 (split on all occurences)* ``--gcs-csv-delimiter`` ``csv_delimiter`` Delimiter that should be used to read the .csv file. *Default: ,* ``--gcs-csv-fieldnames`` ``csv_fieldnames`` List of field names. If set to *None* (*default*), the values in the first row of .csv file will be used as field names. diff --git a/docs/source/writers.rst b/docs/source/writers.rst index f10f5b6..49790aa 100644 --- a/docs/source/writers.rst +++ b/docs/source/writers.rst @@ -84,6 +84,7 @@ Command options CMD Options JSON Options Definition ============================== ====================== ================================================================================================================================================= ``--bq-dataset`` ``dataset`` BigQuery dataset name +``--bq-project-id`` ``project_id`` GCP project ID ``--bq-table`` ``table`` BigQuery table name ``--bq-write-disposition`` ``write-disposition`` BigQuery write disposition. Possible values: TRUNCATE (default), APPEND ``--bq-partition-column`` ``partition-column`` (Optional) Field to be used as a partition column (more information on `this page `__)