From 55fd04b8160bd899bb12286c52a9c85c5cba751f Mon Sep 17 00:00:00 2001 From: Rifqi Naufal Abdjul Date: Tue, 4 Jun 2024 13:58:18 +0700 Subject: [PATCH] Add parquet support --- setup.py | 3 +- tap_spreadsheets_anywhere/configuration.py | 2 +- tap_spreadsheets_anywhere/format_handler.py | 8 ++ tap_spreadsheets_anywhere/parquet_handler.py | 31 +++++ .../test/iris-sample.parquet | Bin 0 -> 2448 bytes .../test/mt-sample.parquet | Bin 0 -> 2932 bytes .../test/test_parquet.py | 131 ++++++++++++++++++ 7 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 tap_spreadsheets_anywhere/parquet_handler.py create mode 100644 tap_spreadsheets_anywhere/test/iris-sample.parquet create mode 100644 tap_spreadsheets_anywhere/test/mt-sample.parquet create mode 100644 tap_spreadsheets_anywhere/test/test_parquet.py diff --git a/setup.py b/setup.py index 59a1e17..f923029 100755 --- a/setup.py +++ b/setup.py @@ -18,7 +18,8 @@ 'openpyxl', 'xlrd', 'paramiko', - 'azure-storage-blob>=12.14.0' + 'azure-storage-blob>=12.14.0', + 'pyarrow>=5.0.0' ], packages=["tap_spreadsheets_anywhere"], include_package_data=True, diff --git a/tap_spreadsheets_anywhere/configuration.py b/tap_spreadsheets_anywhere/configuration.py index d131f25..df42f19 100644 --- a/tap_spreadsheets_anywhere/configuration.py +++ b/tap_spreadsheets_anywhere/configuration.py @@ -12,7 +12,7 @@ Required('pattern'): str, Required('start_date'): str, Required('key_properties'): [str], - Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'detect'), + Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'parquet', 'detect'), Optional('encoding'): str, Optional('invalid_format_action'): Any('ignore','fail'), Optional('universal_newlines'): bool, diff --git a/tap_spreadsheets_anywhere/format_handler.py b/tap_spreadsheets_anywhere/format_handler.py index 6bea333..fee0dc4 100644 --- a/tap_spreadsheets_anywhere/format_handler.py +++ b/tap_spreadsheets_anywhere/format_handler.py @@ -5,9 +5,12 @@ import tap_spreadsheets_anywhere.excel_handler import tap_spreadsheets_anywhere.json_handler import tap_spreadsheets_anywhere.jsonl_handler +import tap_spreadsheets_anywhere.parquet_handler + from azure.storage.blob import BlobServiceClient import os + class InvalidFormatError(Exception): def __init__(self, fname, message="The file was not in the expected format"): self.name = fname @@ -139,6 +142,8 @@ def get_row_iterator(table_spec, uri): format = 'jsonl' elif lowered_uri.endswith(".csv"): format = 'csv' + elif lowered_uri.endswith(".parquet"): + format = 'parquet' else: # TODO: some protocols provide the ability to pull format (content-type) info & we could make use of that here reader = get_streamreader(uri, universal_newlines=universal_newlines, open_mode='r', encoding=encoding) @@ -169,6 +174,9 @@ def get_row_iterator(table_spec, uri): # If encoding is set, smart_open will override binary mode ('b' in open_mode) and it will result in a BadZipFile error reader = get_streamreader(uri, universal_newlines=universal_newlines,newline=None, open_mode='rb', encoding=None) iterator = tap_spreadsheets_anywhere.excel_handler.get_row_iterator(table_spec, reader) + elif format == 'parquet': + reader = get_streamreader(uri, universal_newlines=universal_newlines, newline=None, open_mode='rb') + iterator = tap_spreadsheets_anywhere.parquet_handler.get_row_iterator(table_spec, reader) elif format == 'json': reader = get_streamreader(uri, universal_newlines=universal_newlines, open_mode='r', encoding=encoding) iterator = tap_spreadsheets_anywhere.json_handler.get_row_iterator(table_spec, reader) diff --git a/tap_spreadsheets_anywhere/parquet_handler.py b/tap_spreadsheets_anywhere/parquet_handler.py new file mode 100644 index 0000000..ccdb464 --- /dev/null +++ b/tap_spreadsheets_anywhere/parquet_handler.py @@ -0,0 +1,31 @@ +import re +import logging +import pyarrow.parquet as pq + +LOGGER = logging.getLogger(__name__) + + +def generator_wrapper(table, _={}) -> dict: + # change column name + def format_name(name=""): + formatted_key = re.sub(r"[^\w\s]", "", name) + # replace whitespace with underscores + formatted_key = re.sub(r"\s+", "_", formatted_key) + return formatted_key.lower() + + table = table.rename_columns([format_name(name) for name in table.column_names]) + + for row in table.to_pylist(): + yield row + + +def get_row_iterator(table_spec, file_handle): + try: + parquet_file = pq.ParquetFile(file_handle) + except Exception as e: + LOGGER.error("Unable to read the Parquet file: %s", e) + raise e + + # Use batch to read the Parquet file + for batch in parquet_file.iter_batches(): + yield from generator_wrapper(batch, table_spec) diff --git a/tap_spreadsheets_anywhere/test/iris-sample.parquet b/tap_spreadsheets_anywhere/test/iris-sample.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9224dead94cc960679a7f5681efe4efdbe61bedc GIT binary patch literal 2448 zcmZ`*L1-Ii7JgDj^8efVlXZ3pC; zdH?&rdGCGSd%w%Cyb)^BKaJ|c>>u^rG@i3MOY~n^I738t)2X#gDmNZ|@^LDa8;idE zCY7>Aqs;l-+Gs?X6nmtvI7F-?84DdvXHqnFS-_xdIE8ioJV!QiQLCRkvB_aavfO&!Uh*vVA10Zx#hO|yEjP#u}B3KldA+>P!8pB ze@1QN-eI4{RKgxN!ySA-B1iea$m5I8D2J1k+ zxZ=-$qG`7+YoUiI60W+DtN^FMEiNNahxZXV?=2NWL9TAfJ%nTfsumXMIkmNPM9-?d zaFYrMw~G&ZT%+^alO2w*UdGP}YY4x|9ePf;7JIM*f%kBCm9tdG{svuCg4YvQu)dro6!ktqdJRu7@t3~DmYkEWlq*N8PABJy*18MU%M{bw>%nfVJ z1|foXN$Qxsef}JMGYeR*%9zrs?G~j&XMrlT9ag0PkVC_$q|`#zxdrL-@1XLnFkAJ&*s*lhJTH)KGThUTQh+mywVC%a}1Z)a46>%4=MeHww} z5=!d?Dz%^icr5L3D{Pe#Q-BJ7MGnGh*jy-*tdl?}+;z)PYfkmT9-d;izUVB)ypKHM zaKi+(SmiCsBA5(>vQyelaWdPQiRrx!H$lLasm^Ru?uPXRL9!e6coUT?%4nfN1vDKl zam?c5-(eBew+`pP;@aVy=}B>a4#d)tcF45gLv3d+KI@_9gk?IH9zcaQ{frVFXrF7` z$Xv6gA8N0JLRV;Cn}&OgY#*TNzPlqE3m%nxLd(_ZV7^7X#Xj3c@ilINrNhf;`a>O~ zsxjt)te8JBde64(2nZfTV0aY9z5~e=Q7#`6virYLNGzwS7IaXTWe!lq#N9+|ZAcFq zbIkbQ0Ar&ftHu$PH4+N|A#!Q4QUnw@H@i0dn2fe&#iF=)UqW;P(4Xk6QT?*DN{$13#l! zk1%mIq5K1k^D+x?^-zO3cqdSbPX|zYmm4mc%t$4gL7yII41|6a#Z(#7t!G&gKKWSr z7DZ^%N5D-%Hh4+sq4Gl?riO9aBbqN0J2FDYdKd(WTa_EQ2MA~nbdqx1mTW`AZj(KE zgggxJmRSI1f{A+w(WUj4j299rbqG>?jFajptib9?2*MI#B&ObEZ z#3l|w`rGR58@HB+@$~oAo43}l-nf3_rZjKsH?OU~yMA>Uv$mkU{=sPv4$iYL9`l)A zp_SB1TBqIXFF`UZ|GN72%3E*Vx>{X*XIT%E9-c@$xm&BXuCLzy_XJV?ZrUxl|Cq$lGwf;{rcZrEr?H;DGnW5! zXi~iFh{36GF>=Io|Kcc5{%=^3^%KO?L7o79I5heB&

~5qr#a3h}fRD^C!gO7aBq z^P$Nv&z?YDIDy=sdlKYn>r;>;r;2of_;q%&tAS|ZBC>!uD8N^J#Q(R#VuH-M6J)z5 zRUW{7mdf8vP5xMzyi+JR@6Yp<+0MNTC-*r8FBjCM!b zA)}qu>`IaC1XO}eFb1noOdRsil04+WI3X^@kH!Jh2f3tAhEjq9#RQx}z)eU)TYB!y zZY5y{I&if2oH_U0?|kRnGnzg$o7Itij_UVf13H6cOphV-CYcftLJUjr_n3XUVw9i@cxAD zxpuADg_*h9XgYzJKadeU(+SFEu8^6a*b^0!&m?73+}BTwnG$AVzgif{96yqaBQn*k z9m0KIm8MF2Ay~<-xSnU1%}LL3ny>kOr3b82ZmsNlHM8W`nr_XhHEmDh&eh;l!-Er_ zU9LNUYkT-y-=u3d&1|lSU$Tz+HAt@HuDNCWUEz8El-~r^4}3V_iJ$D-OKY}@?-%{b zl4F*tw&yvuB`3g>)}do1^XK-O}Phasu0&!utm%oglCS z*PL)`J}y;{*$sHpbj`zlVDl=R@VzzP%ocJ-pmZ0AepA>{#dCrYRS}7pj4-iNMI-F? z7RK}dTNE)JVR?+rYdQALWq6VbC6@;j6BYFgev;<%c|}E8&@+;XrZg1|Nh;Y&=~P@I zi-QV74Bn4{?E#bCCL1FXC1@Djhont%`Nxu_t&^#rDHi%E$Qa15WRYum#Y_#c#qb&7 z?RG)X*LOn3sR%MOA;JU^g(A9-;$BR!3DWl*K{z!;&{ztM^`QaoMnMQj#K51vmw^f} zpi}fBKsM`uc4H0xn4U!CjWxE2v3g1l8k=)3g1!PmF+x1zFW`3>ek1z+@<{sf&_H{51LgOG4P?gI`}Koeciy9l zk&;;N)n`6G-5UdkiK`Mjdw=HoV`mjMck$-e*4&=MK-~|>MGC)Y1)r4boecU>rtHpp z*oDor;Cz2Z<*8Cq z12Ai{%+B=U^Qc?-Ese8J@Q3tfoJoJA{~3Q={8b9G>aYL)`uNDV+j}TwB&UoO0!sKYn($2J6jh<> zVm8Vlt7NPk00u(7%d%NKllpU(eaVaZY8IM(JulS})OX-so_zupz9iScGe5G2bO}d1lJu8QG{iU%wwdP z8Xdz38yIVJ0Qxn2pm)Q@74*?vaavP^zt+fSu$y7}9N#@pTK>DwZTa&tq7mAGP>vwY zfD=umshWwY*2|$7RD{b#klda(g8t4UZ!Le13BO_@!3iwG0Fxmf5FT77Sk(y5U<=d8 z3d4<#^={8IURzyA{&fTyN@cbDYGvVN{@qx%bpz>gzhH>Ve#PAQVgO!!rBTXM1g>qFwjRr;$58nmo#b+m`KPVyjBokhqenr z$YLA9KZNw#-2eeR5ZZVHAcUW?TQebqhpk}YxkM142vHm%ab8GYhywyXd=0`!HAk?m zwf03EkP1|+jbNiAeMtob(eUa41nud|NqE@0D-smE9p?`arw8J+XP80TLx{(D*t#oD z*e~r!S31&L35XOj0VAcw4jK$OdDz;D)Z%2V5ypY4Y~u8{N#KNET7wrQzX^O1a1YF>W!{}2uVH*D5{xANHJ=6dI literal 0 HcmV?d00001 diff --git a/tap_spreadsheets_anywhere/test/test_parquet.py b/tap_spreadsheets_anywhere/test/test_parquet.py new file mode 100644 index 0000000..6e89987 --- /dev/null +++ b/tap_spreadsheets_anywhere/test/test_parquet.py @@ -0,0 +1,131 @@ +import logging +import unittest + +from tap_spreadsheets_anywhere import format_handler + +LOGGER = logging.getLogger(__name__) + +TEST_TABLE_SPEC = { + "tables": [ + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-iris", + "pattern": "iris\\-sample\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "parquet", + }, + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-mt", + "pattern": "mt\\-sample\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "parquet", + }, + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-iris-detect", + "pattern": "iris\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "detect", + }, + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-mt-detect", + "pattern": "mt\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "detect", + }, + ] +} + + +class TestParquet(unittest.TestCase): + def test_iris(self): + table_spec = TEST_TABLE_SPEC["tables"][0] + uri = "./tap_spreadsheets_anywhere/test/iris-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 150) + self.assertEqual( + rows[0], + { + "sepallength": 5.1, + "sepalwidth": 3.5, + "petallength": 1.4, + "petalwidth": 0.2, + "variety": "Setosa", + }, + ) + + def test_mt(self): + table_spec = TEST_TABLE_SPEC["tables"][1] + uri = "./tap_spreadsheets_anywhere/test/mt-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 32) + self.assertEqual( + rows[0], + { + "model": "Mazda RX4", + "mpg": 21.0, + "cyl": 6, + "disp": 160.0, + "hp": 110, + "drat": 3.9, + "wt": 2.62, + "qsec": 16.46, + "vs": 0, + "am": 1, + "gear": 4, + "carb": 4, + }, + ) + + def test_iris_detect(self): + table_spec = TEST_TABLE_SPEC["tables"][2] + uri = "./tap_spreadsheets_anywhere/test/iris-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 150) + self.assertEqual( + rows[0], + { + "sepallength": 5.1, + "sepalwidth": 3.5, + "petallength": 1.4, + "petalwidth": 0.2, + "variety": "Setosa", + }, + ) + + def test_mt_detect(self): + table_spec = TEST_TABLE_SPEC["tables"][3] + uri = "./tap_spreadsheets_anywhere/test/mt-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 32) + self.assertEqual( + rows[0], + { + "model": "Mazda RX4", + "mpg": 21.0, + "cyl": 6, + "disp": 160.0, + "hp": 110, + "drat": 3.9, + "wt": 2.62, + "qsec": 16.46, + "vs": 0, + "am": 1, + "gear": 4, + "carb": 4, + }, + )