Skip to content

Commit

Permalink
Implements BigQuery metadata extractor (#51)
Browse files Browse the repository at this point in the history
* Implements BigQuery metadata extractor

* Increment version

* Increment version correctly to 1.2.0

* Set version to 1.1.0
  • Loading branch information
gtoonstra authored and jinhyukchang committed May 21, 2019
1 parent bbefcd8 commit bfb69f0
Show file tree
Hide file tree
Showing 4 changed files with 520 additions and 1 deletion.
194 changes: 194 additions & 0 deletions databuilder/extractor/bigquery_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import logging
from collections import namedtuple

import google.oauth2.service_account
import google_auth_httplib2
from googleapiclient.discovery import build
import httplib2
from pyhocon import ConfigTree # noqa: F401
from typing import List, Any # noqa: F401

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


DatasetRef = namedtuple('DatasetRef', ['datasetId', 'projectId'])
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])

LOGGER = logging.getLogger(__name__)


class BigQueryMetadataExtractor(Extractor):

""" A metadata extractor for bigquery tables, taking the schema metadata
from the google cloud bigquery API's. This extractor goes through all visible
datasets in the project identified by project_id and iterates over all tables
it finds. A separate account is configurable through the key_path parameter,
which should point to a valid json file corresponding to a service account.
This extractor supports nested columns, which are delimited by a dot (.) in the
column name.
"""

PROJECT_ID_KEY = 'project_id'
KEY_PATH_KEY = 'key_path'
PAGE_SIZE_KEY = 'page_size'
FILTER_KEY = 'filter'
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/bigquery.readonly')
DEFAULT_PAGE_SIZE = 300
NUM_RETRIES = 3

def init(self, conf):
# type: (ConfigTree) -> None
self.key_path = conf.get_string(BigQueryMetadataExtractor.KEY_PATH_KEY, None)
self.project_id = conf.get_string(BigQueryMetadataExtractor.PROJECT_ID_KEY)
self.pagesize = conf.get_int(
BigQueryMetadataExtractor.PAGE_SIZE_KEY,
BigQueryMetadataExtractor.DEFAULT_PAGE_SIZE)
self.filter = conf.get_string(BigQueryMetadataExtractor.FILTER_KEY, '')

if self.key_path:
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
self.key_path, scopes=BigQueryMetadataExtractor._DEFAULT_SCOPES))
else:
credentials, _ = google.auth.default(scopes=BigQueryMetadataExtractor._DEFAULT_SCOPES)

http = httplib2.Http()
authed_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http)
self.bigquery_service = build('bigquery', 'v2', http=authed_http, cache_discovery=False)
self.datasets = self._retrieve_datasets()
self.iter = iter(self._iterate_over_tables())

def extract(self):
# type: () -> Any
try:
return next(self.iter)
except StopIteration:
return None

def _iterate_over_tables(self):
# type: () -> Any
for dataset in self.datasets:
for entry in self._retrieve_tables(dataset):
yield(entry)

def _retrieve_datasets(self):
# type: () -> List[DatasetRef]
datasets = []
for page in self._page_dataset_list_results():
if 'datasets' not in page:
continue

for dataset in page['datasets']:
dataset_ref = dataset['datasetReference']
ref = DatasetRef(**dataset_ref)
datasets.append(ref)

return datasets

def _page_dataset_list_results(self):
# type: () -> Any
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=False, # Do not return hidden datasets
filter=self.filter,
maxResults=self.pagesize).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)

while response:
yield response

if 'nextPageToken' in response:
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=True,
filter=self.filter,
pageToken=response['nextPageToken']).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)
else:
response = None

def _retrieve_tables(self, dataset):
# type: () -> Any
for page in self._page_table_list_results(dataset):
if 'tables' not in page:
continue

for table in page['tables']:
tableRef = table['tableReference']
table = self.bigquery_service.tables().get(
projectId=tableRef['projectId'],
datasetId=tableRef['datasetId'],
tableId=tableRef['tableId']).execute(num_retries=BigQueryMetadataExtractor.NUM_RETRIES)

# BigQuery tables also have interesting metadata about partitioning
# data location (EU/US), mod/create time, etc... Extract that some other time?
schema = table['schema']
cols = []
if 'fields' in schema:
total_cols = 0
for column in schema['fields']:
total_cols = self._iterate_over_cols('', column, cols, total_cols + 1)

table_meta = TableMetadata(
database='bigquery',
cluster=tableRef['projectId'],
schema_name=tableRef['datasetId'],
name=tableRef['tableId'],
description=table.get('description', ''),
columns=cols,
is_view=table['type'] == 'VIEW')

yield(table_meta)

def _iterate_over_cols(self, parent, column, cols, total_cols):
# type: (str, str, List[ColumnMetadata()], int) -> int
if len(parent) > 0:
col_name = '{parent}.{field}'.format(parent=parent, field=column['name'])
else:
col_name = column['name']

if column['type'] == 'RECORD':
col = ColumnMetadata(
name=col_name,
description=column.get('description', ''),
col_type=column['type'],
sort_order=total_cols)
cols.append(col)
total_cols += 1
for field in column['fields']:
total_cols = self._iterate_over_cols(col_name, field, cols, total_cols)
else:
col = ColumnMetadata(
name=col_name,
description=column.get('description', ''),
col_type=column['type'],
sort_order=total_cols)
cols.append(col)
return total_cols + 1

def _page_table_list_results(self, dataset):
# type: (DatasetRef) -> Any
response = self.bigquery_service.tables().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
maxResults=self.pagesize).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)

while response:
yield response

if 'nextPageToken' in response:
response = self.bigquery_service.datasets().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
maxRResults=self.pagesize,
pageToken=response['nextPageToken']).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)
else:
response = None

def get_scope(self):
# type: () -> str
return 'extractor.bigquery_table_metadata'
81 changes: 81 additions & 0 deletions example/scripts/sample_bigquery_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
This is a example script for extracting BigQuery usage results
"""

import logging
from pyhocon import ConfigFactory
import sqlite3

from databuilder.extractor.bigquery_metadata_extractor import BigQueryMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

logging.basicConfig(level=logging.INFO)

# replace localhost with docker host ip
# todo: get the ip from input argument
NEO4J_ENDPOINT = 'bolt://localhost:7687'
neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'


def create_connection(db_file):
try:
conn = sqlite3.connect(db_file)
return conn
except Exception:
logging.exception('exception')
return None


# todo: Add a second model
def create_bq_job(metadata_type, gcloud_project):
tmp_folder = '/var/tmp/amundsen/{metadata_type}'.format(metadata_type=metadata_type)
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)

bq_meta_extractor = BigQueryMetadataExtractor()
csv_loader = FsNeo4jCSVLoader()

task = DefaultTask(extractor=bq_meta_extractor,
loader=csv_loader,
transformer=NoopTransformer())

job_config = ConfigFactory.from_dict({
'extractor.bigquery_table_metadata.{}'.format(BigQueryMetadataExtractor.PROJECT_ID_KEY):
gcloud_project,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR):
True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
return job


if __name__ == "__main__":
# start table job
job1 = create_bq_job('bigquery_metadata', 'your-project-here')
job1.launch()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages


__version__ = '1.0.15'
__version__ = '1.1.0'


setup(
Expand Down
Loading

0 comments on commit bfb69f0

Please sign in to comment.