Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Spike: copy S3 database/schema data to another location in S3 #800

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions edx/analytics/tasks/common/vertica_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import re
import subprocess

import luigi

Expand Down Expand Up @@ -368,3 +369,97 @@ def run(self):

with self.output().open('w') as metadata_file:
json.dump(metadata, metadata_file)


class CopyVerticaSchemaFromS3ToS3Task(VerticaExportMixin, luigi.Task):
"""
Provides entry point for copying a MySQL database destined for Snowflake from one location in S3 to another.
"""
new_warehouse_path = luigi.Parameter(
description='The warehouse_path URL to which to copy database data.'
)
# remove parameters that are not needed (or shouldn't be used).
vertica_credentials = None
intermediate_warehouse_path = None

def __init__(self, *args, **kwargs):
"""
Inits this Luigi task.
"""
super(CopyVerticaSchemaFromS3ToS3Task, self).__init__(*args, **kwargs)
self.metadata = None

@property
def schema_metadata(self):
if self.metadata is None:
metadata_target = self.get_schema_metadata_target()
with metadata_target.open('r') as metadata_file:
self.metadata = json.load(metadata_file)
return self.metadata

def get_table_list_for_schema(self):
return self.schema_metadata['table_list']

def requires(self):
return ExternalURL(self.get_schema_metadata_target().path)

def output(self):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
metadata_location = url_path_join(
self.new_warehouse_path,
'import/vertica/sqoop/',
self.vertica_warehouse_name,
self.vertica_schema_name,
'_metadata_export_schema',
partition_path_spec,
'_metadata'
)
return get_target_from_url(metadata_location)

def copy_table(self, table_name):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
source_path = url_path_join(
self.warehouse_path,
'import/vertica/sqoop/',
self.vertica_warehouse_name,
self.vertica_schema_name,
table_name,
partition_path_spec,
)
destination_path = url_path_join(
self.new_warehouse_path,
'import/vertica/sqoop/',
self.vertica_warehouse_name,
self.vertica_schema_name,
table_name,
partition_path_spec,
)
kwargs = {}
# First attempt was to create a ScalableS3Client() wrapper in __init__, then calling:
# self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs)
# This succeeded when files were small enough to be below the part_size, but there were
# files for LMS and ecommerce that exceeded this limit (i.e. by a lot), and multi-part
# uploads were failing due to "SignatureDoesNotMatch" errors from S3. Since we're using
# older boto code instead of boto3 (which requires a Luigi upgrade), it seemed easier to
# just install awscli and use that in a subprocess to copy each table's data.
command = 'aws s3 cp {source_path} {destination_path} --recursive'.format(
source_path=source_path, destination_path=destination_path
)
try:
log.info("Calling '{}'".format(command))
return_val = subprocess.check_call(command, shell=True)
log.info("Call returned '{}'".format(return_val))
except subprocess.CalledProcessError as exception:
raise

def copy_metadata_file(self):
self.copy_table('_metadata_export_schema')

def run(self):
if self.new_warehouse_path == self.warehouse_path:
raise Exception("Must set new_warehouse_path {} to be different than warehouse_path {}".format(new_warehouse_path, self.warehouse_path))

for table_name in self.get_table_list_for_schema():
self.copy_table(table_name)

self.copy_metadata_file()
117 changes: 115 additions & 2 deletions edx/analytics/tasks/warehouse/load_internal_reporting_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import re
import subprocess

import luigi

Expand All @@ -17,6 +18,8 @@
from edx.analytics.tasks.common.vertica_load import SchemaManagementTask, VerticaCopyTask
from edx.analytics.tasks.util.hive import HivePartition, WarehouseMixin
from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url, url_path_join
from edx.analytics.tasks.util.s3_util import ScalableS3Client


try:
from google.cloud.bigquery import SchemaField
Expand Down Expand Up @@ -1007,7 +1010,7 @@ def __init__(self, *args, **kwargs):
Inits this Luigi task.
"""
super(ImportMysqlDatabaseFromS3ToSnowflakeSchemaTask, self).__init__(*args, **kwargs)
metadata_target = self.get_schema_metadata_target()
metadata_target = self.get_database_metadata_target()
with metadata_target.open('r') as metadata_file:
self.metadata = json.load(metadata_file)

Expand All @@ -1016,7 +1019,7 @@ def __init__(self, *args, **kwargs):
def get_table_list_for_database(self):
return self.metadata['table_list']

def get_schema_metadata_target(self):
def get_database_metadata_target(self):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
metadata_location = url_path_join(
self.warehouse_path,
Expand Down Expand Up @@ -1060,3 +1063,113 @@ def complete(self):
"""
# OverwriteOutputMixin changes the complete() method behavior, so we override it.
return all(r.complete() for r in luigi.task.flatten(self.requires()))


class CopyMysqlDatabaseFromS3ToS3Task(WarehouseMixin, luigi.Task):
"""
Provides entry point for copying a MySQL database destined for Snowflake from one location in S3 to another.
"""
date = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
)
database = luigi.Parameter(
description='Name of database as stored in S3.'
)
warehouse_subdirectory = luigi.Parameter(
default='import_mysql_to_vertica',
description='Subdirectory under warehouse_path to store intermediate data.'
)
new_warehouse_path = luigi.Parameter(
description='The warehouse_path URL to which to copy database data.'
)

def __init__(self, *args, **kwargs):
"""
Inits this Luigi task.
"""
super(CopyMysqlDatabaseFromS3ToS3Task, self).__init__(*args, **kwargs)
self.metadata = None

@property
def database_metadata(self):
if self.metadata is None:
metadata_target = self.get_database_metadata_target()
with metadata_target.open('r') as metadata_file:
self.metadata = json.load(metadata_file)
return self.metadata

def get_database_metadata_target(self):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
metadata_location = url_path_join(
self.warehouse_path,
self.warehouse_subdirectory,
self.database,
DUMP_METADATA_OUTPUT,
partition_path_spec,
METADATA_FILENAME,
)
return get_target_from_url(metadata_location)

def get_table_list_for_database(self):
return self.database_metadata['table_list']

def requires(self):
return ExternalURL(self.get_database_metadata_target().path)

def output(self):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
metadata_location = url_path_join(
self.new_warehouse_path,
self.warehouse_subdirectory,
self.database,
DUMP_METADATA_OUTPUT,
partition_path_spec,
METADATA_FILENAME,
)
return get_target_from_url(metadata_location)

def copy_table(self, table_name):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
source_path = url_path_join(
self.warehouse_path,
self.warehouse_subdirectory,
self.database,
table_name,
partition_path_spec,
)
destination_path = url_path_join(
self.new_warehouse_path,
self.warehouse_subdirectory,
self.database,
table_name,
partition_path_spec,
)
kwargs = {}
# First attempt was to create a ScalableS3Client() wrapper in __init__, then calling:
# self.s3_client.copy(source_path, destination_path, part_size=3000000000, **kwargs)
# This succeeded when files were small enough to be below the part_size, but there were
# files for LMS and ecommerce that exceeded this limit (i.e. by a lot), and multi-part
# uploads were failing due to "SignatureDoesNotMatch" errors from S3. Since we're using
# older boto code instead of boto3 (which requires a Luigi upgrade), it seemed easier to
# just install awscli and use that in a subprocess to copy each table's data.
command = 'aws s3 cp {source_path} {destination_path} --recursive'.format(
source_path=source_path, destination_path=destination_path
)
try:
log.info("Calling '{}'".format(command))
return_val = subprocess.check_call(command, shell=True)
log.info("Call returned '{}'".format(return_val))
except subprocess.CalledProcessError as exception:
raise

def copy_metadata_file(self):
self.copy_table(DUMP_METADATA_OUTPUT)

def run(self):
if self.new_warehouse_path == self.warehouse_path:
raise Exception("Must set new_warehouse_path {} to be different than warehouse_path {}".format(new_warehouse_path, self.warehouse_path))

for table_name in self.get_table_list_for_database():
self.copy_table(table_name)

self.copy_metadata_file()
4 changes: 4 additions & 0 deletions requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,7 @@ yarn-api-client==0.2.3

# The following packages are considered to be unsafe in a requirements file:
# setuptools==41.4.0 # via google-cloud-core, protobuf, python-daemon

# Add this here for the copy S3 task as a standalone hack.
# But pick a version that matches the boto requirements above...
awscli==1.14.46