forked from Azure-Samples/modern-data-warehouse-dataops
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload-file-to-lakehouse.py
129 lines (104 loc) · 5.34 KB
/
upload-file-to-lakehouse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import argparse
import os
from typing import Generator, Union
from azure.devops.connection import Connection
from azure.devops.v7_0.git.models import GitVersionDescriptor
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeFileClient, DataLakeServiceClient, FileSystemClient
from dotenv import load_dotenv
from msrest.authentication import BasicAuthentication
load_dotenv()
# Environment variables
account_name = os.environ.get("ONELAKE_ACCOUNT_NAME")
workspace_id = os.environ.get("FABRIC_WORKSPACE_ID")
lakehouse_id = os.environ.get("FABRIC_LAKEHOUSE_ID")
# Azure Repo Details
organization_name = os.environ.get("GIT_ORGANIZATION_NAME")
personal_access_token = os.environ.get("GIT_PERSONAL_ACCESS_TOKEN")
project_name = os.environ.get("GIT_PROJECT_NAME")
repo_name = os.environ.get("GIT_REPO_NAME")
branch_name = os.environ.get("GIT_BRANCH_NAME")
def get_authentication_token() -> DefaultAzureCredential:
"""Get the default Azure credential for authentication."""
return DefaultAzureCredential()
def get_file_system_client(token_credential: DefaultAzureCredential) -> FileSystemClient:
"""Get the file system client for Azure Data Lake Storage Gen2."""
account_url = f"https://{account_name}.dfs.fabric.microsoft.com"
service_client = DataLakeServiceClient(account_url, credential=token_credential)
return service_client.get_file_system_client(workspace_id)
def get_azure_repo_connection() -> Connection:
"""Establish a connection to Azure DevOps using personal access token."""
organization_url = f"https://dev.azure.com/{organization_name}"
return Connection(base_url=organization_url, creds=BasicAuthentication("", personal_access_token))
def read_file_from_repo(connection: Connection, project_name: str, branch_name: str, src_file_name: str) -> Generator:
"""Read the file content from the Azure Repo."""
git_client = connection.clients.get_git_client()
version_descriptor = GitVersionDescriptor(version=branch_name, version_type="branch")
return git_client.get_item_content(
repository_id=repo_name, project=project_name, path=src_file_name, version_descriptor=version_descriptor
)
def write_file_to_lakehouse(
file_system_client: FileSystemClient,
source_file_path: str,
target_file_path: str,
upload_from: str,
connection: Union[Connection, None] = None,
) -> None:
"""Write the file to Fabric Lakehouse."""
data_path = f"{lakehouse_id}/Files/{target_file_path}"
file_client: DataLakeFileClient = file_system_client.get_file_client(data_path)
if upload_from == "local":
print(f"[I] Uploading local '{source_file_path}' to '{source_file_path}'")
with open(source_file_path, "rb") as file:
file_client.upload_data(file.read(), overwrite=True)
elif upload_from == "git" and connection:
print(f"[I] Uploading from git '{source_file_path}' to '{source_file_path}'")
file_content = read_file_from_repo(connection, project_name, branch_name, source_file_path)
content_str = "".join([chunk.decode("utf-8") for chunk in file_content])
file_client.upload_data(content_str, overwrite=True)
else:
print(f"[E] Invalid upload_from value: '{upload_from}' or missing connection")
def read_from_fabric_lakehouse(file_system_client: FileSystemClient, target_file_path: str) -> None:
"""Read the file from Fabric Lakehouse."""
data_path = f"{lakehouse_id}/Files/{target_file_path}"
file_client: DataLakeFileClient = file_system_client.get_file_client(data_path)
print(f"[I] Reading the file just uploaded from {data_path}")
download = file_client.download_file()
downloaded_bytes = download.readall()
print(downloaded_bytes)
def main(source_file_path: str, target_file_path: str, upload_from: str) -> None:
"""Main function to handle the workflow."""
print(f"[I] Fabric workspace id: {workspace_id}")
print(f"[I] Fabric lakehouse id: {lakehouse_id}")
print(f"[I] Source file path: {source_file_path}")
print(f"[I] Target file path: {target_file_path}")
print(f"[I] Upload from: {upload_from}")
# Initialize credentials and connections
if upload_from == "git":
azure_repo_connection = get_azure_repo_connection()
else:
azure_repo_connection = None
token_credential = get_authentication_token()
file_system_client = get_file_system_client(token_credential)
# Write and read operations
write_file_to_lakehouse(
file_system_client, source_file_path, target_file_path, upload_from, connection=azure_repo_connection
)
read_from_fabric_lakehouse(file_system_client, target_file_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Script to upload local file or file from Azure Repo to Fabric lakehouse."
)
parser.add_argument(
"source_file_path", type=str, help="The source file path of the local file or in the Azure Repo."
)
parser.add_argument("target_file_path", type=str, help="The target file path in the Fabric lakehouse.")
parser.add_argument(
"--upload_from",
type=str,
default="local",
choices=["local", "git"],
help="Specify the source of the file to upload: 'local' or 'git'. Default is 'local'.",
)
args = parser.parse_args()
main(args.source_file_path, args.target_file_path, args.upload_from)