-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Http Request Transformation in UDF #151
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, good job :) - left you some comments (nothing too major)
import socket | ||
import ssl | ||
|
||
_HttpRequestTransformation_chunk_size_column = "HttpRequestTransformation_chunk_size_column" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make these constants, meaning:
- you don't have to start with an underscore (no need to mark them private)
- make them all caps: "CHUNK_SIZE_COLUMN" for example
- you can remove the "HttpRequestTransformation" part from them (for brevity)
default="application/json", | ||
description="The content type of the request.", | ||
) | ||
authorization_token: Union[str, None] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a SecretStr - we don't want tokens to be printed unnecessarily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from koheesio.models import SecretStr
default=None, | ||
description="The authorization token for the request.", | ||
) | ||
body_column: Union[str, None] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right type would be Optional[str]
rather than Union[str, None]
(same for the other Fields where you have it like this)
|
||
self.output.df = self.df | ||
|
||
@udf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move the udf to the top of the file, makes it a bit more readable
if ":" in rest: | ||
base_url, rest = rest.split(":", 1) | ||
if "/" in rest: | ||
port, path = rest.split("/", 1) | ||
else: | ||
port = rest | ||
port = int(port) | ||
else: | ||
base_url, path = rest.split("/", 1) | ||
port = 80 if protocol == "http" else 443 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we do this with regex instead? Seems a bit cleaner than this imho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for example:
import re
import pytest
url_parts = re.compile(
# protocol is a sequence of letters, digits, and characters followed by ://
r'^(?P<protocol>[^:]+)://'
# base_url is a sequence of characters until encountering a `/` or a `:` followed by 1 to 5 digits
r'(?P<base_url>\S+?(?=:\d{1,5}/|/|$))'
# port is an optional sequence of digits between 0 and 65535 preceded by a colon
r'(?::(?P<port>\d{1,5}))?'
# rest is the rest of the url
r'(?P<rest>\/.*)?$'
)
test_data = [
# url, expected_protocol, expected_base_url, expected_port, expected_rest
("https://example.com:443/some/random/url", "https", "example.com", "443", "/some/random/url"),
("http://something.a.bit.more.complicated:9999/foo/bar", "http", "something.a.bit.more.complicated", "9999", "/foo/bar"),
("https://no-port-example.ext/42/index.jsp?foo=bar&baz=bla", "https", "no-port-example.ext", None, "/42/index.jsp?foo=bar&baz=bla"),
("ftp://ftp.example.com/resource.txt", "ftp", "ftp.example.com", None, "/resource.txt"),
("http://localhost:8080/test", "http", "localhost", "8080", "/test"),
("https://sub.domain.example.com/path/to/resource?query=string&another=value", "https", "sub.domain.example.com", None, "/path/to/resource?query=string&another=value"),
("http://192.168.1.1:8080/admin", "http", "192.168.1.1", "8080", "/admin"),
("https://user:[email protected]:8443/path?query=param#fragment", "https", "user:[email protected]", "8443", "/path?query=param#fragment"),
("http://example.org", "http", "example.org", None, None),
("https://example.net/path/to/resource.html", "https", "example.net", None, "/path/to/resource.html"),
("http://example.com:80/path/to/resource?query=param", "http", "example.com", "80", "/path/to/resource?query=param"),
("custom_protocol://base_url:foo/rest", "custom_protocol", "base_url:foo", None, "/rest"),
("WEBDAV://example.com:8080/path/to/resource", "WEBDAV", "example.com", "8080", "/path/to/resource"),
]
@pytest.mark.parametrize("url, expected_protocol, expected_base_url, expected_port, expected_rest", test_data)
def test_url_parts(url, expected_protocol, expected_base_url, expected_port, expected_rest):
match = url_parts.match(url)
assert match is not None, f"Failed to match {url}"
assert match.group("protocol") == expected_protocol, f"Failed to match protocol: {match.groupdict()}"
assert match.group("port") == expected_port, f"Failed to match port: {match.groupdict()}"
assert match.group("base_url") == expected_base_url, f"Failed to match base_url: {match.groupdict()}"
assert match.group("rest") == expected_rest, f"Failed to match rest: : {match.groupdict()}"
chunk = s.recv(chunk_size) | ||
if len(chunk) == 0: # No more data received, quitting | ||
break | ||
response = response + chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response += chunk
def test_downloading_files(self, input_df: DataFrame, download_path: Path) -> None: | ||
"""Test that the files are downloaded and the DataFrame is transformed correctly.""" | ||
# Arrange | ||
expected_data = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure I saved the expected data in a file already in the data folder of the tests right? Either way, I would like to take this hardcoding out of the code and refer to a file instead.
This PR makes it possible to execute a Http request from a node in the cluster rather than collecting the data on the driver and executing the request there.
Description
In this PR a UDF was created that can handle http requests. The implementation is based on the socket and ssl library because the requests library is not able to be pickeled. The UDF can handle multiple scenarios GET, POST, PUT, DELETE, ... With authorization and a body to be send.
Proper documentation still needs to be written. Waiting for any remarks before doing so.
download file transformation
Motivation and Context
This is a suggestion to make the download file transformation proces better. Looking for feedback on the usecase.
How Has This Been Tested?
A test is written.
Screenshots (if appropriate):
Types of changes
Checklist: