diff --git a/apps/beeswax/src/beeswax/common.py b/apps/beeswax/src/beeswax/common.py index e3401e86ff1..c4d9c98473a 100644 --- a/apps/beeswax/src/beeswax/common.py +++ b/apps/beeswax/src/beeswax/common.py @@ -20,15 +20,15 @@ """ from __future__ import print_function -import numbers import re import time +import numbers from django import forms -from beeswax.models import Namespace, Compute +from beeswax.models import Compute, Namespace -HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$") +HIVE_IDENTIFER_REGEX = re.compile(r"(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$") DL_FORMATS = ['csv', 'xls'] @@ -56,12 +56,13 @@ (' ', "Space", 32), ] + def timing(fn): def decorator(*args, **kwargs): time1 = time.time() ret = fn(*args, **kwargs) time2 = time.time() - print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2-time1)*1000.0)) + print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2 - time1) * 1000.0)) return ret return decorator @@ -79,7 +80,8 @@ def apply_natural_sort(collection, key=None): Applies a natural sort (http://rosettacode.org/wiki/Natural_sorting) to a list or dictionary Dictionary types require a sort key to be specified """ - to_digit = lambda i: int(i) if i.isdigit() else i + def to_digit(i): + return int(i) if i.isdigit() else i def tokenize_and_convert(item, key=None): if key: @@ -89,13 +91,26 @@ def tokenize_and_convert(item, key=None): return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key)) -def is_compute(cluster): +def find_compute_in_cluster(cluster): if not cluster: - return False + return None connector = cluster.get('connector') compute = cluster.get('compute') - compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES - return compute_check(cluster) or compute_check(connector) or compute_check(compute) + + def _compute_check(x): + return x and x.get('type') in COMPUTE_TYPES + + return ( + cluster if _compute_check(cluster) + else compute if _compute_check(compute) + else connector if _compute_check(connector) else None) + + +def extract_session_type(snippet): + compute = find_compute_in_cluster(snippet) + if compute and compute.get('name'): + return compute['name'] + return snippet.get('type') if snippet else None ''' @@ -107,17 +122,16 @@ def is_compute(cluster): 3. Lookup namespace based on dialect from cluster or prpvided dialect and return the first compute filtered by user-access. Needs valid user ''' + + def find_compute(cluster=None, user=None, dialect=None, namespace_id=None): if cluster: # If we find a full/partial cluster object, we will attempt to load a compute connector = cluster.get('connector') compute = cluster.get('compute') - compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES # Pick the most probable compute object - selected_compute = (cluster if compute_check(cluster) - else compute if compute_check(compute) - else connector if compute_check(connector) else None) + selected_compute = find_compute_in_cluster(cluster) # If found, we will attempt to reload it, first by id then by name if selected_compute: diff --git a/apps/beeswax/src/beeswax/server/dbms.py b/apps/beeswax/src/beeswax/server/dbms.py index df279adcd96..7f75f55501f 100644 --- a/apps/beeswax/src/beeswax/server/dbms.py +++ b/apps/beeswax/src/beeswax/server/dbms.py @@ -15,19 +15,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import object -import logging import re import sys -import threading -import time import json +import time +import logging +import threading +from builtins import object from django.core.cache import caches from django.urls import reverse from kazoo.client import KazooClient -from beeswax.models import Compute +from azure.abfs import abfspath +from beeswax.common import apply_natural_sort, find_compute_in_cluster +from beeswax.conf import ( + APPLY_NATURAL_SORT_MAX, + AUTH_PASSWORD, + AUTH_USERNAME, + CACHE_TIMEOUT, + CLOSE_SESSIONS, + HIVE_DISCOVERY_HIVESERVER2_ZNODE, + HIVE_DISCOVERY_HS2, + HIVE_DISCOVERY_LLAP, + HIVE_DISCOVERY_LLAP_HA, + HIVE_DISCOVERY_LLAP_ZNODE, + HIVE_HTTP_THRIFT_PORT, + HIVE_METASTORE_HOST, + HIVE_METASTORE_PORT, + HIVE_SERVER_HOST, + HIVE_SERVER_PORT, + LIST_PARTITIONS_LIMIT, + LLAP_SERVER_HOST, + LLAP_SERVER_PORT, + LLAP_SERVER_THRIFT_PORT, + MAX_NUMBER_OF_SESSIONS, + QUERY_PARTITIONS_LIMIT, + SERVER_CONN_TIMEOUT, + USE_SASL as HIVE_USE_SASL, + ZOOKEEPER_CONN_TIMEOUT, + has_session_pool, +) +from beeswax.design import hql_query +from beeswax.hive_site import ( + get_hiveserver2_kerberos_principal, + hiveserver2_impersonation_enabled, + hiveserver2_thrift_http_path, + hiveserver2_transport_mode, + hiveserver2_use_ssl, +) +from beeswax.models import QUERY_TYPES, Compute, QueryHistory from desktop.conf import CLUSTER_ID, has_connectors from desktop.lib.django_util import format_preserving_redirect from desktop.lib.exceptions_renderable import PopupException @@ -38,20 +75,6 @@ from indexer.file_format import HiveFormat from libzookeeper import conf as libzookeeper_conf -from azure.abfs import abfspath -from beeswax.conf import HIVE_SERVER_HOST, HIVE_SERVER_PORT, HIVE_SERVER_HOST, HIVE_HTTP_THRIFT_PORT, HIVE_METASTORE_HOST, \ - HIVE_METASTORE_PORT, LIST_PARTITIONS_LIMIT, SERVER_CONN_TIMEOUT, ZOOKEEPER_CONN_TIMEOUT, \ - AUTH_USERNAME, AUTH_PASSWORD, APPLY_NATURAL_SORT_MAX, QUERY_PARTITIONS_LIMIT, HIVE_DISCOVERY_HIVESERVER2_ZNODE, \ - HIVE_DISCOVERY_HS2, HIVE_DISCOVERY_LLAP, HIVE_DISCOVERY_LLAP_HA, HIVE_DISCOVERY_LLAP_ZNODE, CACHE_TIMEOUT, \ - LLAP_SERVER_HOST, LLAP_SERVER_PORT, LLAP_SERVER_THRIFT_PORT, USE_SASL as HIVE_USE_SASL, CLOSE_SESSIONS, has_session_pool, \ - MAX_NUMBER_OF_SESSIONS -from beeswax.common import apply_natural_sort, is_compute -from beeswax.design import hql_query -from beeswax.hive_site import hiveserver2_use_ssl, hiveserver2_impersonation_enabled, get_hiveserver2_kerberos_principal, \ - hiveserver2_transport_mode, hiveserver2_thrift_http_path -from beeswax.models import QueryHistory, QUERY_TYPES - - if sys.version_info[0] > 2: from django.utils.encoding import force_str else: @@ -70,6 +93,7 @@ DBMS_CACHE_LOCK = threading.Lock() cache = caches[CACHES_HIVE_DISCOVERY_KEY] + # Using file cache to make sure eventlet threads are uniform, this cache is persistent on startup # So we clear it to make sure the server resets hiveserver2 host. def reset_ha(): @@ -148,7 +172,7 @@ def get(user, query_server=None, cluster=None): def get_query_server_config(name='beeswax', connector=None): - if connector and (has_connectors() or is_compute(connector)): + if connector and (has_connectors() or find_compute_in_cluster(connector)): LOG.debug("Query via connector %s (%s)" % (name, connector.get('type'))) query_server = get_query_server_config_via_connector(connector) else: @@ -316,7 +340,7 @@ def get_query_server_config_via_connector(connector): connector_name = connector['type'] compute_name = compute['name'] if compute.get('id'): - compute = Compute.objects.get(id=compute['id']).to_dict() #Reload the full compute from db + compute = Compute.objects.get(id=compute['id']).to_dict() # Reload the full compute from db LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute)) if compute['options'].get('has_ssh') == 'true': @@ -384,7 +408,6 @@ def __init__(self, client, server_type): self.server_name = self.client.query_server.get('dialect') if self.client.query_server['server_name'].isdigit() \ else self.client.query_server['server_name'] - @classmethod def to_matching_wildcard(cls, identifier=None): cleaned = "*" @@ -392,7 +415,6 @@ def to_matching_wildcard(cls, identifier=None): cleaned = "*%s*" % identifier.strip().strip("*") return cleaned - def get_databases(self, database_names='*'): if database_names != '*': database_names = self.to_matching_wildcard(database_names) @@ -404,11 +426,9 @@ def get_databases(self, database_names='*'): return databases - def get_database(self, database): return self.client.get_database(database) - def alter_database(self, database, properties): hql = 'ALTER database `%s` SET DBPROPERTIES (' % database hql += ', '.join(["'%s'='%s'" % (k, v) for k, v in list(properties.items())]) @@ -426,7 +446,6 @@ def alter_database(self, database, properties): return self.client.get_database(database) - def get_tables_meta(self, database='default', table_names='*', table_types=None): database = database.lower() # Impala is case sensitive @@ -444,7 +463,6 @@ def get_tables_meta(self, database='default', table_names='*', table_types=None) tables = apply_natural_sort(tables, key='name') return tables - def get_tables(self, database='default', table_names='*', table_types=None): database = database.lower() # Impala is case sensitive @@ -459,7 +477,6 @@ def get_tables(self, database='default', table_names='*', table_types=None): tables = apply_natural_sort(tables) return tables - def _get_tables_via_sparksql(self, database, table_names='*'): hql = "SHOW TABLES IN %s" % database if table_names != '*': @@ -486,7 +503,6 @@ def _get_tables_via_sparksql(self, database, table_names='*'): else: return [] - def get_table(self, database, table_name): try: return self.client.get_table(database, table_name) @@ -505,7 +521,6 @@ def get_table(self, database, table_name): else: raise e - def alter_table(self, database, table_name, new_table_name=None, comment=None, tblproperties=None): table_obj = self.get_table(database, table_name) if table_obj is None: @@ -536,7 +551,6 @@ def alter_table(self, database, table_name, new_table_name=None, comment=None, t return self.client.get_table(database, table_name) - def get_column(self, database, table_name, column_name): table = self.get_table(database, table_name) for col in table.cols: @@ -544,7 +558,6 @@ def get_column(self, database, table_name, column_name): return col return None - def alter_column(self, database, table_name, column_name, new_column_name, column_type, comment=None, partition_spec=None, cascade=False): hql = 'ALTER TABLE `%s`.`%s`' % (database, table_name) @@ -571,27 +584,23 @@ def alter_column(self, database, table_name, column_name, new_column_name, colum return self.get_column(database, table_name, new_column_name) - def execute_query(self, query, design): return self.execute_and_watch(query, design=design) - def select_star_from(self, database, table, limit=1000): if table.partition_keys: # Filter on max number of partitions for partitioned tables - hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit + hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit else: hql = "SELECT * FROM `%s`.`%s` LIMIT %d;" % (database, table.name, limit) return self.execute_statement(hql) - def get_select_star_query(self, database, table, limit=1000): if table.partition_keys: # Filter on max number of partitions for partitioned tables - hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit + hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit else: hql = "SELECT * FROM `%s`.`%s` LIMIT %d;" % (database, table.name, limit) return hql - def execute_statement(self, hql): if self.server_name.startswith('impala'): query = hql_query(hql, QUERY_TYPES[1]) @@ -599,7 +608,6 @@ def execute_statement(self, hql): query = hql_query(hql, QUERY_TYPES[0]) return self.execute_and_watch(query) - def fetch(self, query_handle, start_over=False, rows=None): no_start_over_support = [ config_variable @@ -611,15 +619,12 @@ def fetch(self, query_handle, start_over=False, rows=None): return self.client.fetch(query_handle, start_over, rows) - def close_operation(self, query_handle): return self.client.close_operation(query_handle) - def open_session(self, user): return self.client.open_session(user) - def close_session(self, session): resp = self.client.close_session(session) @@ -633,14 +638,12 @@ def close_session(self, session): return session - def cancel_operation(self, query_handle): resp = self.client.cancel_operation(query_handle) if self.client.query_server.get('dialect') == 'impala': resp = self.client.close_operation(query_handle) return resp - def get_sample(self, database, table, column=None, nested=None, limit=100, generate_sql_only=False, operation=None): result = None hql = None @@ -689,7 +692,6 @@ def get_sample(self, database, table, column=None, nested=None, limit=100, gener return result - def _get_sample_partition_query(self, database, table, column='*', limit=100, operation=None): max_parts = QUERY_PARTITIONS_LIMIT.get() partitions = self.get_partitions(database, table, partition_spec=None, max_parts=max_parts) @@ -714,7 +716,6 @@ def _get_sample_partition_query(self, database, table, column='*', limit=100, op return prefix + " FROM `%(database)s`.`%(table)s` %(partition_clause)s LIMIT %(limit)s" % \ {'database': database, 'table': table.name, 'partition_clause': partition_clause, 'limit': limit} - def analyze_table(self, database, table): if self.server_name.startswith('impala'): hql = 'COMPUTE STATS `%(database)s`.`%(table)s`' % {'database': database, 'table': table} @@ -730,7 +731,6 @@ def analyze_table(self, database, table): return self.execute_statement(hql) - def analyze_table_columns(self, database, table): if self.server_name.startswith('impala'): hql = 'COMPUTE STATS `%(database)s`.`%(table)s`' % {'database': database, 'table': table} @@ -743,7 +743,6 @@ def analyze_table_columns(self, database, table): return self.execute_statement(hql) - def get_table_stats(self, database, table): stats = [] @@ -763,7 +762,6 @@ def get_table_stats(self, database, table): return stats - def get_table_columns_stats(self, database, table, column): if self.server_name.startswith('impala'): hql = 'SHOW COLUMN STATS `%(database)s`.`%(table)s`' % {'database': database, 'table': table} @@ -779,7 +777,7 @@ def get_table_columns_stats(self, database, table, column): data = list(result.rows()) if self.server_name.startswith('impala'): - if column == -1: # All the columns + if column == -1: # All the columns return [self._extract_impala_column(col) for col in data] else: data = [col for col in data if col[0] == column][0] @@ -823,7 +821,6 @@ def get_table_properties(self, database, table, property_name=None): self.close(handle) return result - def get_table_describe(self, database, table): hql = 'DESCRIBE `%s`.`%s`' % (database, table) @@ -835,7 +832,6 @@ def get_table_describe(self, database, table): self.close(handle) return result - def get_top_terms(self, database, table, column, limit=30, prefix=None): limit = min(limit, 100) prefix_match = '' @@ -852,7 +848,7 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None): } query = hql_query(hql) - handle = self.execute_and_wait(query, timeout_sec=60.0) # Hive is very slow + handle = self.execute_and_wait(query, timeout_sec=60.0) # Hive is very slow if handle: result = self.fetch(handle, rows=limit) @@ -861,7 +857,6 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None): else: return [] - def drop_table(self, database, table): if table.is_view: hql = "DROP VIEW `%s`.`%s`" % (database, table.name,) @@ -870,11 +865,10 @@ def drop_table(self, database, table): return self.execute_statement(hql) - def load_data(self, database, table, form_data, design, generate_ddl_only=False): hql = "LOAD DATA INPATH" source_path = "%(path)s" % form_data - if source_path.lower().startswith("abfs"): #this is to check if its using an ABFS path + if source_path.lower().startswith("abfs"): # this is to check if its using an ABFS path source_path = abfspath(source_path) hql += " '%s'" % source_path if form_data['overwrite']: @@ -896,7 +890,6 @@ def load_data(self, database, table, form_data, design, generate_ddl_only=False) return self.execute_query(query, design) - def drop_tables(self, database, tables, design, skip_trash=False, generate_ddl_only=False): hql = [] @@ -918,11 +911,9 @@ def drop_tables(self, database, tables, design, skip_trash=False, generate_ddl_o return self.execute_query(query, design) - def drop_database(self, database): return self.execute_statement("DROP DATABASE `%s`" % database) - def drop_databases(self, databases, design, generate_ddl_only=False): hql = [] @@ -953,7 +944,6 @@ def insert_query_into_directory(self, query_history, target_dir): hql = "INSERT OVERWRITE DIRECTORY '%s' %s" % (target_dir, query) return self.execute_statement(hql) - def create_table_as_a_select(self, request, query_history, target_database, target_table, result_meta): design = query_history.design.get_design() database = design.query['database'] @@ -984,7 +974,7 @@ def create_table_as_a_select(self, request, query_history, target_database, targ if not delim.isdigit(): delim = str(ord(delim)) - hql = ''' + hql = r''' CREATE TABLE `%s` ( %s ) @@ -1016,23 +1006,18 @@ def create_table_as_a_select(self, request, query_history, target_database, targ return query_history - def use(self, database, session=None): query = hql_query('USE `%s`' % database) return self.client.use(query, session=session) - - def get_log(self, query_handle, start_over=True): - return self.client.get_log(query_handle, start_over) - + def get_log(self, query_handle, start_over=True, session=None): + return self.client.get_log(query_handle, start_over, session=session) def get_state(self, handle): return self.client.get_state(handle) - - def get_operation_status(self, handle): - return self.client.get_operation_status(handle) - + def get_operation_status(self, handle, session=None): + return self.client.get_operation_status(handle, session=session) def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5): """ @@ -1064,7 +1049,6 @@ def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5): raise QueryServerTimeoutException(message=msg) - def execute_next_statement(self, query_history, hql_query): if query_history.is_success() or query_history.is_expired(): # We need to go to the next statement only if the previous one passed @@ -1084,7 +1068,6 @@ def execute_next_statement(self, query_history, hql_query): return self.execute_and_watch(query, query_history=query_history) - def execute_and_watch(self, query, design=None, query_history=None): """ Run query and return a QueryHistory object in order to see its progress on a Web page. @@ -1140,24 +1123,20 @@ def execute_and_watch(self, query, design=None, query_history=None): return query_history - def get_results_metadata(self, handle): return self.client.get_results_metadata(handle) - def close(self, handle): return self.client.close(handle) - def get_partitions(self, db_name, table, partition_spec=None, max_parts=None, reverse_sort=True): if max_parts is None or max_parts > LIST_PARTITIONS_LIMIT.get(): max_parts = LIST_PARTITIONS_LIMIT.get() return self.client.get_partitions(db_name, table.name, partition_spec, max_parts=max_parts, reverse_sort=reverse_sort) - def get_partition(self, db_name, table_name, partition_spec, generate_ddl_only=False): - if partition_spec and self.server_name.startswith('impala'): # partition_spec not supported + if partition_spec and self.server_name.startswith('impala'): # partition_spec not supported partition_query = " AND ".join(partition_spec.split(',')) else: table = self.get_table(db_name, table_name) @@ -1176,11 +1155,9 @@ def get_partition(self, db_name, table_name, partition_spec, generate_ddl_only=F else: return self.execute_statement(hql) - def describe_partition(self, db_name, table_name, partition_spec): return self.client.get_table(db_name, table_name, partition_spec=partition_spec) - def drop_partitions(self, db_name, table_name, partition_specs, design=None, generate_ddl_only=False): hql = [] @@ -1198,7 +1175,6 @@ def drop_partitions(self, db_name, table_name, partition_specs, design=None, gen return self.execute_query(query, design) - def get_indexes(self, db_name, table_name): hql = 'SHOW FORMATTED INDEXES ON `%(table)s` IN `%(database)s`' % {'table': table_name, 'database': db_name} @@ -1211,11 +1187,9 @@ def get_indexes(self, db_name, table_name): return result - def get_configuration(self): return self.client.get_configuration() - def get_functions(self, prefix=None, database=None): ''' Not using self.client.get_functions() as pretty limited. More comments there. @@ -1242,7 +1216,6 @@ def get_functions(self, prefix=None, database=None): return rows - def get_function(self, name): hql = 'DESCRIBE FUNCTION EXTENDED `%(name)s`' % { 'name': name, @@ -1257,7 +1230,6 @@ def get_function(self, name): return rows - def get_query_metadata(self, query): hql = 'SELECT * FROM ( %(query)s ) t LIMIT 0' % {'query': query.strip(';')} @@ -1270,11 +1242,9 @@ def get_query_metadata(self, query): return result - def explain(self, query): return self.client.explain(query) - def get_primary_keys(self, database_name, table_name, catalog_name=None): return self.client.get_primary_keys( @@ -1283,7 +1253,6 @@ def get_primary_keys(self, database_name, table_name, catalog_name=None): catalog_name=catalog_name ) - def get_foreign_keys(self, parent_catalog_name=None, parent_database_name=None, parent_table_name=None, foreign_catalog_name=None, foreign_database_name=None, foreign_table_name=None): @@ -1296,11 +1265,9 @@ def get_foreign_keys(self, parent_catalog_name=None, parent_database_name=None, foreign_table_name=foreign_table_name ) - def get_status(self): return self.client.getStatus() - def get_default_configuration(self, include_hadoop): return self.client.get_default_configuration(include_hadoop) @@ -1331,7 +1298,7 @@ def __init__(self, db, query): self.name = 'Test' cols = db.get_query_metadata(query).data_table.cols() for col in cols: - col.name = re.sub('^t\.', '', col.name) + col.name = re.sub(r'^t\.', '', col.name) col.type = HiveFormat.FIELD_TYPE_TRANSLATE.get(col.type, 'string') self.cols = cols self.hdfs_link = None diff --git a/apps/beeswax/src/beeswax/server/hive_metastore_server.py b/apps/beeswax/src/beeswax/server/hive_metastore_server.py index 36d62c5edae..35510cd2b77 100644 --- a/apps/beeswax/src/beeswax/server/hive_metastore_server.py +++ b/apps/beeswax/src/beeswax/server/hive_metastore_server.py @@ -15,26 +15,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import object -import logging import re import sys -import thrift - -from django.utils.encoding import smart_str, force_unicode - -import hadoop.cluster +import logging +from builtins import object -from desktop.lib import thrift_util -from desktop.conf import KERBEROS +import thrift +from django.utils.encoding import force_unicode, smart_str from hive_metastore import ThriftHiveMetastore from TCLIService.ttypes import TOperationState +import hadoop.cluster from beeswax import hive_site from beeswax.conf import SERVER_CONN_TIMEOUT -from beeswax.server.hive_server2_lib import ResultCompatible from beeswax.models import HiveServerQueryHandle, QueryHistory -from beeswax.server.dbms import Table, DataTable +from beeswax.server.dbms import DataTable, Table +from beeswax.server.hive_server2_lib import ResultCompatible +from desktop.conf import KERBEROS +from desktop.lib import thrift_util if sys.version_info[0] > 2: from django.utils.translation import gettext as _ @@ -105,7 +103,6 @@ def parse_result_row(row): yield parse_result_row(row) - class HiveMetastoreClient(object): def __init__(self, query_server, user): @@ -113,15 +110,12 @@ def __init__(self, query_server, user): self.query_server = query_server self.meta_client = self.meta_client() - def get_databases(self, *args, **kwargs): return self.meta_client.get_all_databases() - def get_tables(self, *args, **kwargs): return self.meta_client.get_tables(*args, **kwargs) - def get_tables_meta(self, *args, **kwargs): meta_tables = self.meta_client.get_table_meta(*args, **kwargs) return [ @@ -140,41 +134,32 @@ def get_table(self, *args, **kwargs): return table - def get_partitions(self, db_name, tbl_name, max_parts): if max_parts is None: max_parts = -1 return self.meta_client.get_partitions(db_name, tbl_name, max_parts) - def use(self, query): pass - def query(self, query, statement=0): return HiveServerQueryHandle(secret='mock', guid='mock') - def get_state(self, handle): return QueryHistory.STATE.available - def close(self, handle): pass - - def get_operation_status(self, handle): + def get_operation_status(self, handle, session=None): return MockFinishedOperation() - def get_default_configuration(self, *args, **kwargs): return [] - def fetch(self, handle, start_over=False, max_rows=None): return EmptyResultCompatible() - @classmethod def get_security(cls, query_server=None): cluster_conf = hadoop.cluster.get_cluster_conf_for_job_submission() @@ -193,7 +178,6 @@ def get_security(cls, query_server=None): return use_sasl, kerberos_principal_short_name - def meta_client(self): """Get the Thrift client to talk to the metastore""" @@ -281,7 +265,7 @@ def alter_partition(self, db_name, tbl_name, new_part): self._encode_partition(new_part) return self._client.alter_partition(db_name, tbl_name, new_part) - use_sasl, kerberos_principal_short_name = HiveMetastoreClient.get_security() # TODO Reuse from HiveServer2 lib + use_sasl, kerberos_principal_short_name = HiveMetastoreClient.get_security() # TODO Reuse from HiveServer2 lib client = thrift_util.get_client( ThriftHiveMetastore.Client, diff --git a/apps/beeswax/src/beeswax/server/hive_server2_lib.py b/apps/beeswax/src/beeswax/server/hive_server2_lib.py index afdc60c4cd1..7f5ea87dc4e 100644 --- a/apps/beeswax/src/beeswax/server/hive_server2_lib.py +++ b/apps/beeswax/src/beeswax/server/hive_server2_lib.py @@ -15,28 +15,44 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import next, filter, map, object -import logging -import json import re import sys - +import json +import logging +from builtins import filter, map, next, object from operator import itemgetter from TCLIService import TCLIService -from TCLIService.ttypes import TOpenSessionReq, TGetTablesReq, TFetchResultsReq, TStatusCode, TGetResultSetMetadataReq, \ - TGetColumnsReq, TTypeId, TExecuteStatementReq, TGetOperationStatusReq, TFetchOrientation, \ - TCloseSessionReq, TGetSchemasReq, TGetLogReq, TCancelOperationReq, TCloseOperationReq, TFetchResultsResp, TRowSet, TGetFunctionsReq, \ - TGetCrossReferenceReq, TGetPrimaryKeysReq - -from desktop.lib import python_util, thrift_util -from desktop.conf import DEFAULT_USER, USE_THRIFT_HTTP_JWT, ENABLE_XFF_FOR_HIVE_IMPALA, ENABLE_X_CSRF_TOKEN_FOR_HIVE_IMPALA +from TCLIService.ttypes import ( + TCancelOperationReq, + TCloseOperationReq, + TCloseSessionReq, + TExecuteStatementReq, + TFetchOrientation, + TFetchResultsReq, + TFetchResultsResp, + TGetColumnsReq, + TGetCrossReferenceReq, + TGetFunctionsReq, + TGetLogReq, + TGetOperationStatusReq, + TGetPrimaryKeysReq, + TGetResultSetMetadataReq, + TGetSchemasReq, + TGetTablesReq, + TOpenSessionReq, + TRowSet, + TStatusCode, + TTypeId, +) from beeswax import conf as beeswax_conf, hive_site -from beeswax.hive_site import hiveserver2_use_ssl from beeswax.conf import CONFIG_WHITELIST, LIST_PARTITIONS_LIMIT, MAX_CATALOG_SQL_ENTRIES -from beeswax.models import Session, HiveServerQueryHandle, HiveServerQueryHistory -from beeswax.server.dbms import Table, DataTable, QueryServerException, InvalidSessionQueryServerException, reset_ha +from beeswax.hive_site import hiveserver2_use_ssl +from beeswax.models import HiveServerQueryHandle, HiveServerQueryHistory, Session +from beeswax.server.dbms import DataTable, InvalidSessionQueryServerException, QueryServerException, Table, reset_ha +from desktop.conf import DEFAULT_USER, ENABLE_X_CSRF_TOKEN_FOR_HIVE_IMPALA, ENABLE_XFF_FOR_HIVE_IMPALA, USE_THRIFT_HTTP_JWT +from desktop.lib import python_util, thrift_util from notebook.connectors.base import get_interpreter if sys.version_info[0] > 2: @@ -1095,18 +1111,18 @@ def fetch_log(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT, return '\n'.join(lines) - def get_operation_status(self, operation_handle): + def get_operation_status(self, operation_handle, session=None): req = TGetOperationStatusReq(operationHandle=operation_handle) - (res, session) = self.call(self._client.GetOperationStatus, req) + (res, session) = self.call(self._client.GetOperationStatus, req, session=session) return res - def get_log(self, operation_handle): + def get_log(self, operation_handle, session=None): try: req = TGetLogReq(operationHandle=operation_handle) - (res, session) = self.call(self._client.GetLog, req) + (res, session) = self.call(self._client.GetLog, req, session=session) return res.log except Exception as e: - if 'Invalid query handle' in str(e): + if 'Invalid query handle' in str(e) or 'Invalid or unknown query handle' in str(e): message = 'Invalid query handle' LOG.error('%s: %s' % (message, e)) else: @@ -1428,9 +1444,9 @@ def get_state(self, handle): res = self._client.get_operation_status(operationHandle) return HiveServerQueryHistory.STATE_MAP[res.operationState] - def get_operation_status(self, handle): + def get_operation_status(self, handle, session=None): operationHandle = handle.get_rpc_handle() - return self._client.get_operation_status(operationHandle) + return self._client.get_operation_status(operationHandle, session=session) def use(self, query, session=None): data = self._client.execute_query(query, session=session) @@ -1474,11 +1490,11 @@ def close_session(self, session): def dump_config(self): return 'Does not exist in HS2' - def get_log(self, handle, start_over=True): + def get_log(self, handle, start_over=True, session=None): operationHandle = handle.get_rpc_handle() if beeswax_conf.USE_GET_LOG_API.get() or self.query_server.get('dialect') == 'impala': - return self._client.get_log(operationHandle) + return self._client.get_log(operationHandle, session=session) else: if start_over: orientation = TFetchOrientation.FETCH_FIRST diff --git a/apps/jobbrowser/src/jobbrowser/apis/query_api.py b/apps/jobbrowser/src/jobbrowser/apis/query_api.py index a7fe3f3dfbf..9644f461c44 100644 --- a/apps/jobbrowser/src/jobbrowser/apis/query_api.py +++ b/apps/jobbrowser/src/jobbrowser/apis/query_api.py @@ -15,38 +15,37 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import filter -from builtins import range -import itertools -import logging +import os import re import sys import time +import logging +import itertools +from builtins import filter, range from datetime import datetime +from urllib.parse import urlparse + import pytz from babel import localtime -import os - -from urllib.parse import urlparse +from beeswax.common import extract_session_type from desktop.lib import export_csvxls from impala.conf import COORDINATOR_UI_SPNEGO +from jobbrowser.apis.base_api import Api from libanalyze import analyze as analyzer, rules from notebook.conf import ENABLE_QUERY_ANALYSIS -from jobbrowser.apis.base_api import Api - if sys.version_info[0] > 2: from django.utils.translation import gettext as _ else: from django.utils.translation import ugettext as _ -ANALYZER = rules.TopDownAnalysis() # We need to parse some files so save as global +ANALYZER = rules.TopDownAnalysis() # We need to parse some files so save as global LOG = logging.getLogger() try: - from beeswax.models import Session, Compute - from impala.server import get_api as get_impalad_api, _get_impala_server_url + from beeswax.models import Compute, Session + from impala.server import _get_impala_server_url, get_api as get_impalad_api except ImportError as e: LOG.exception('Some application are not enabled: %s' % e) @@ -60,12 +59,13 @@ def _get_api(user, cluster=None): server_url = compute['options'].get('api_url') else: # TODO: multi computes if snippet.get('compute') or snippet['type'] has computes - application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala') + application = extract_session_type(cluster) or 'impala' session = Session.objects.get_session(user, application=application) server_url = _get_impala_server_url(session) return get_impalad_api(user=user, url=server_url) -def _convert_to_6_digit_ms_local_time(start_time): + +def _convert_to_6_digit_ms_local_time(start_time): if '.' in start_time: time, microseconds = start_time.split('.') if len(microseconds) > 6: @@ -73,7 +73,7 @@ def _convert_to_6_digit_ms_local_time(start_time): start_time = '.'.join([time, microseconds]) else: start_time = f'{start_time}.000000' - + local_tz = pytz.timezone(os.environ.get('TZ', 'UTC')) # Convert to datetime object in UTC, convert to provided timezone, and then format back into a string return (datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f") @@ -81,6 +81,7 @@ def _convert_to_6_digit_ms_local_time(start_time): .astimezone(local_tz) .strftime("%Y-%m-%d %H:%M:%S.%f")) + class QueryApi(Api): def __init__(self, user, impala_api=None, cluster=None): @@ -96,7 +97,7 @@ def apps(self, filters): filter_list = self._get_filter_list(filters) jobs_iter = itertools.chain(jobs['in_flight_queries'], jobs['completed_queries']) - jobs_iter_filtered = self._n_filter(filter_list, jobs_iter) + jobs_iter_filtered = self._n_filter(filter_list, jobs_iter) apps = { 'apps': sorted([{ @@ -130,8 +131,8 @@ def apps(self, filters): def _time_in_ms_groups(self, groups): time = 0 for x in range(0, len(groups), 3): - if groups[x+1]: - time += self._time_in_ms(groups[x+1], groups[x+2]) + if groups[x + 1]: + time += self._time_in_ms(groups[x + 1], groups[x + 2]) return time def _time_in_ms(self, time, period): @@ -142,9 +143,9 @@ def _time_in_ms(self, time, period): elif period == 's': return float(time) * 1000 elif period == 'm': - return float(time) * 60000 #1000*60 + return float(time) * 60000 # 1000*60 elif period == 'h': - return float(time) * 3600000 #1000*60*60 + return float(time) * 3600000 # 1000*60*60 elif period == 'd': return float(time) * 86400000 # 1000*60*60*24 else: @@ -164,8 +165,8 @@ def app(self, appid): parsed_api_url = urlparse(self.api.url) app.update({ - 'progress': float(progress_groups.group(1)) \ - if progress_groups and progress_groups.group(1) else 100 \ + 'progress': float(progress_groups.group(1)) + if progress_groups and progress_groups.group(1) else 100 if self._api_status(app.get('status')) in ['SUCCEEDED', 'FAILED'] else 1, 'type': 'queries', 'doc_url': '%s/query_plan?query_id=%s' % (self.api.url, appid) if not COORDINATOR_UI_SPNEGO.get() else @@ -195,7 +196,7 @@ def action(self, appid, action): elif result.get('contents') and message.get('status') != -1: message['message'] = result.get('contents') - return message; + return message def logs(self, appid, app_type, log_name=None, is_embeddable=False): return {'logs': ''} @@ -214,12 +215,11 @@ def profile(self, appid, app_type, app_property, app_filters): else: return self._query(appid) - def profile_encoded(self, appid): return self.api.get_query_profile_encoded(query_id=appid) def _memory(self, appid, app_type, app_property, app_filters): - return self.api.get_query_memory(query_id=appid); + return self.api.get_query_memory(query_id=appid) def _metrics(self, appid): query_profile = self.api.get_query_profile_encoded(appid) @@ -253,6 +253,7 @@ def get_exchange_icon(o): return {'svg': 'hi-random'} else: return {'svg': 'hi-exchange'} + def get_sigma_icon(o): if re.search(r'streaming', o['label_detail'], re.IGNORECASE): return {'svg': 'hi-sigma'} @@ -275,6 +276,7 @@ def get_sigma_icon(o): 'ANALYTIC': {'type': 'SINGULAR', 'icon': {'svg': 'hi-timeline'}}, 'UNION': {'type': 'UNION', 'icon': {'svg': 'hi-merge'}} } + def process(node, mapping=mapping): node['id'], node['name'] = node['label'].split(':') details = mapping.get(node['name']) @@ -335,7 +337,7 @@ def make_lambda(name, value): return lambda app: app[name] == value for key, name in list(filter_names.items()): - text_filter = re.search(r"\s*("+key+")\s*:([^ ]+)", filters.get("text")) + text_filter = re.search(r"\s*(" + key + r")\s*:([^ ]+)", filters.get("text")) if text_filter and text_filter.group(1) == key: filter_list.append(make_lambda(name, text_filter.group(2).strip())) if filters.get("time"): diff --git a/desktop/libs/notebook/src/notebook/connectors/base.py b/desktop/libs/notebook/src/notebook/connectors/base.py index 2d8c0e1f2fc..2cd8d536ae1 100644 --- a/desktop/libs/notebook/src/notebook/connectors/base.py +++ b/desktop/libs/notebook/src/notebook/connectors/base.py @@ -25,7 +25,7 @@ from django.utils.encoding import smart_str -from beeswax.common import find_compute, is_compute +from beeswax.common import find_compute, find_compute_in_cluster from desktop.auth.backend import is_admin from desktop.conf import TASK_SERVER, has_connectors from desktop.lib import export_csvxls @@ -408,7 +408,7 @@ def patch_snippet_for_connector(snippet, user=None): Connector backward compatibility switcher. # TODO Connector unification """ - if is_compute(snippet): + if find_compute_in_cluster(snippet): snippet['connector'] = find_compute(cluster=snippet, user=user) if snippet['connector'] and snippet['connector'].get('dialect'): snippet['dialect'] = snippet['connector']['dialect'] @@ -439,7 +439,7 @@ def get_api(request, snippet): if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user): LOG.debug('Using the interpreter from snippet') interpreter = snippet.get('interpreter') - elif is_compute(snippet): + elif find_compute_in_cluster(snippet): LOG.debug("Finding the compute from db using snippet: %s" % snippet) interpreter = find_compute(cluster=snippet, user=request.user) else: diff --git a/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py b/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py index 879402f2564..1fa5e80dded 100644 --- a/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py +++ b/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py @@ -16,20 +16,19 @@ # limitations under the License. from __future__ import division -from future import standard_library -standard_library.install_aliases() -from builtins import next, object -import binascii + +import re +import sys import copy import json -import logging -import re import struct -import sys +import logging +import binascii +from builtins import next, object from django.urls import reverse -from beeswax.common import is_compute +from beeswax.common import extract_session_type from desktop.auth.backend import is_admin from desktop.conf import USE_DEFAULT_CONFIGURATION, has_connectors from desktop.lib.conf import BoundConfig @@ -40,46 +39,63 @@ from desktop.lib.rest.http_client import RestException from desktop.lib.thrift_util import unpack_guid, unpack_guid_base64 from desktop.models import DefaultConfiguration, Document2 - -from notebook.connectors.base import Api, QueryError, QueryExpired, OperationTimeout, OperationNotSupported, _get_snippet_name, Notebook, \ - get_interpreter, patch_snippet_for_connector +from notebook.connectors.base import ( + Api, + Notebook, + OperationNotSupported, + OperationTimeout, + QueryError, + QueryExpired, + _get_snippet_name, + get_interpreter, + patch_snippet_for_connector, +) if sys.version_info[0] > 2: from urllib.parse import quote as urllib_quote, unquote as urllib_unquote + from django.utils.translation import gettext as _ else: - from django.utils.translation import ugettext as _ from urllib import quote as urllib_quote, unquote as urllib_unquote + from django.utils.translation import ugettext as _ + LOG = logging.getLogger() try: from beeswax import conf as beeswax_conf, data_export from beeswax.api import _autocomplete, _get_sample_data - from beeswax.conf import CONFIG_WHITELIST as hive_settings, DOWNLOAD_ROW_LIMIT, DOWNLOAD_BYTES_LIMIT, MAX_NUMBER_OF_SESSIONS, \ - has_session_pool, has_multiple_sessions, CLOSE_SESSIONS + from beeswax.conf import ( + CLOSE_SESSIONS, + CONFIG_WHITELIST as hive_settings, + DOWNLOAD_BYTES_LIMIT, + DOWNLOAD_ROW_LIMIT, + MAX_NUMBER_OF_SESSIONS, + has_multiple_sessions, + has_session_pool, + ) from beeswax.data_export import upload from beeswax.design import hql_query from beeswax.models import QUERY_TYPES, HiveServerQueryHandle, HiveServerQueryHistory, QueryHistory, Session from beeswax.server import dbms - from beeswax.server.dbms import get_query_server_config, QueryServerException, reset_ha + from beeswax.server.dbms import QueryServerException, get_query_server_config, reset_ha from beeswax.views import parse_out_jobs, parse_out_queries except ImportError as e: LOG.warning('Hive and HiveServer2 interfaces are not enabled: %s' % e) hive_settings = None try: - from impala import api # Force checking if Impala is enabled + from impala import api # Force checking if Impala is enabled from impala.conf import CONFIG_WHITELIST as impala_settings - from impala.server import get_api as get_impalad_api, ImpalaDaemonApiException, _get_impala_server_url + from impala.server import ImpalaDaemonApiException, _get_impala_server_url, get_api as get_impalad_api except ImportError as e: LOG.warning("Impala app is not enabled") impala_settings = None try: from jobbrowser.apis.query_api import _get_api - from jobbrowser.conf import ENABLE_QUERY_BROWSER, ENABLE_HIVE_QUERY_BROWSER + from jobbrowser.conf import ENABLE_HIVE_QUERY_BROWSER, ENABLE_QUERY_BROWSER from jobbrowser.views import get_job has_query_browser = ENABLE_QUERY_BROWSER.get() has_hive_query_browser = ENABLE_HIVE_QUERY_BROWSER.get() @@ -179,7 +195,6 @@ class HS2Api(Api): def get_properties(lang='hive'): return ImpalaConfiguration.PROPERTIES if lang == 'impala' else HiveConfiguration.PROPERTIES - @query_error_handler def create_session(self, lang='hive', properties=None): application = 'beeswax' if lang == 'hive' or lang == 'llap' else lang @@ -246,7 +261,6 @@ def create_session(self, lang='hive', properties=None): return response - @query_error_handler def close_session(self, session): app_name = session.get('type') @@ -281,7 +295,6 @@ def close_session(self, session): return response - def close_session_idle(self, notebook, session): idle = True response = {'result': []} @@ -317,16 +330,14 @@ def execute(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) statement = self._get_current_statement(notebook, snippet) - compute = snippet.get('compute', {}) - session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type'] + session_type = extract_session_type(snippet) session = self._get_session(notebook, session_type) query = self._prepare_hql_query(snippet, statement['statement'], session) _session = self._get_session_by_id(notebook, session_type) - try: - if statement.get('statement_id') == 0: # TODO: move this to client + if statement.get('statement_id') == 0: # TODO: move this to client if query.database and not statement['statement'].lower().startswith('set'): result = db.use(query.database, session=_session) if result.session: @@ -356,7 +367,6 @@ def execute(self, notebook, snippet): return response - @query_error_handler def check_status(self, notebook, snippet): response = {} @@ -384,7 +394,6 @@ def check_status(self, notebook, snippet): return response - @query_error_handler def fetch_result(self, notebook, snippet, rows, start_over): db = self._get_db(snippet, interpreter=self.interpreter) @@ -411,7 +420,6 @@ def fetch_result(self, notebook, snippet, rows, start_over): 'type': 'table' } - @query_error_handler def fetch_result_size(self, notebook, snippet): resp = { @@ -440,7 +448,6 @@ def fetch_result_size(self, notebook, snippet): return resp - @query_error_handler def cancel(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) @@ -449,7 +456,6 @@ def cancel(self, notebook, snippet): db.cancel_operation(handle) return {'status': 0} - @query_error_handler def get_log(self, notebook, snippet, startFrom=None, size=None): db = self._get_db(snippet, interpreter=self.interpreter) @@ -457,7 +463,6 @@ def get_log(self, notebook, snippet, startFrom=None, size=None): handle = self._get_handle(snippet) return db.get_log(handle, start_over=startFrom == 0) - @query_error_handler def close_statement(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) @@ -472,7 +477,6 @@ def close_statement(self, notebook, snippet): raise e return {'status': 0} - def can_start_over(self, notebook, snippet): try: db = self._get_db(snippet, interpreter=self.interpreter) @@ -484,13 +488,12 @@ def can_start_over(self, notebook, snippet): raise e return can_start_over - @query_error_handler def progress(self, notebook, snippet, logs=''): patch_snippet_for_connector(snippet) if snippet['dialect'] == 'hive': - match = re.search('Total jobs = (\d+)', logs, re.MULTILINE) + match = re.search(r'Total jobs = (\d+)', logs, re.MULTILINE) total = int(match.group(1)) if match else 1 started = logs.count('Starting Job') @@ -499,13 +502,12 @@ def progress(self, notebook, snippet, logs=''): progress = int((started + ended) * 100 / (total * 2)) return max(progress, 5) # Return 5% progress as a minimum elif snippet['dialect'] == 'impala': - match = re.findall('(\d+)% Complete', logs, re.MULTILINE) + match = re.findall(r'(\d+)% Complete', logs, re.MULTILINE) # Retrieve the last reported progress percentage if it exists return int(match[-1]) if match and isinstance(match, list) else 0 else: return 50 - @query_error_handler def get_jobs(self, notebook, snippet, logs): jobs = [] @@ -552,7 +554,6 @@ def get_jobs(self, notebook, snippet, logs): return jobs - @query_error_handler def autocomplete(self, snippet, database=None, table=None, column=None, nested=None, operation=None): db = self._get_db(snippet, interpreter=self.interpreter) @@ -577,7 +578,6 @@ def autocomplete(self, snippet, database=None, table=None, column=None, nested=N return resp - @query_error_handler def get_sample_data(self, snippet, database=None, table=None, column=None, is_async=False, operation=None): try: @@ -586,7 +586,6 @@ def get_sample_data(self, snippet, database=None, table=None, column=None, is_as except QueryServerException as ex: raise QueryError(ex.message) - @query_error_handler def explain(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) @@ -613,7 +612,6 @@ def explain(self, notebook, snippet): 'statement': statement, } - @query_error_handler def export_data_as_hdfs_file(self, snippet, target_file, overwrite): db = self._get_db(snippet, interpreter=self.interpreter) @@ -626,8 +624,7 @@ def export_data_as_hdfs_file(self, snippet, target_file, overwrite): return '/filebrowser/view=%s' % urllib_quote( urllib_quote(target_file.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS) - ) # Quote twice, because of issue in the routing on client - + ) # Quote twice, because of issue in the routing on client def export_data_as_table(self, notebook, snippet, destination, is_temporary=False, location=None): db = self._get_db(snippet, interpreter=self.interpreter) @@ -654,7 +651,6 @@ def export_data_as_table(self, notebook, snippet, destination, is_temporary=Fals return hql, success_url - def export_large_data_to_hdfs(self, notebook, snippet, destination): response = self._get_current_statement(notebook, snippet) session = self._get_session(notebook, snippet['type']) @@ -684,7 +680,6 @@ def export_large_data_to_hdfs(self, notebook, snippet, destination): return hql, success_url - def upgrade_properties(self, lang='hive', properties=None): upgraded_properties = copy.deepcopy(self.get_properties(lang)) @@ -708,7 +703,6 @@ def upgrade_properties(self, lang='hive', properties=None): return upgraded_properties - def _get_session(self, notebook, type='hive'): session = next((session for session in notebook['sessions'] if session['type'] == type), None) return session @@ -723,7 +717,6 @@ def _get_session_by_id(self, notebook, type='hive'): filters['owner'] = self.user return Session.objects.get(**filters) - def _get_hive_execution_engine(self, notebook, snippet): # Get hive.execution.engine from snippet properties, if none, then get from session properties = snippet['properties'] @@ -746,7 +739,6 @@ def _get_hive_execution_engine(self, notebook, snippet): return engine - def _prepare_hql_query(self, snippet, statement, session): settings = snippet['properties'].get('settings', None) file_resources = snippet['properties'].get('files', None) @@ -775,7 +767,6 @@ def _prepare_hql_query(self, snippet, statement, session): database=database ) - def get_browse_query(self, snippet, database, table, partition_spec=None): db = self._get_db(snippet, interpreter=self.interpreter) table = db.get_table(database, table) @@ -789,7 +780,6 @@ def get_browse_query(self, snippet, database, table, partition_spec=None): else: return db.get_select_star_query(database, table, limit=100) - def _get_handle(self, snippet): try: handle = snippet['result']['handle'].copy() @@ -805,7 +795,6 @@ def _get_handle(self, snippet): return HiveServerQueryHandle(**handle) - def _get_db(self, snippet, is_async=False, interpreter=None): if interpreter and interpreter.get('dialect'): dialect = interpreter['dialect'] @@ -828,7 +817,6 @@ def _get_db(self, snippet, is_async=False, interpreter=None): # Note: name is not used if interpreter is present return dbms.get(self.user, query_server=get_query_server_config(name=name, connector=interpreter)) - def _parse_job_counters(self, job_id): # Attempt to fetch total records from the job's Hive counter total_records, total_size = None, None @@ -864,7 +852,6 @@ def _parse_job_counters(self, job_id): return total_records, total_size - def _get_hive_result_size(self, notebook, snippet): total_records, total_size, msg = None, None, None engine = self._get_hive_execution_engine(notebook, snippet).lower() @@ -879,8 +866,8 @@ def _get_hive_result_size(self, notebook, snippet): else: msg = _('Hive query did not execute any jobs.') elif engine == 'spark': - total_records_re = "RECORDS_OUT_0: (?P\d+)" - total_size_re = "Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P\d+)" + total_records_re = r"RECORDS_OUT_0: (?P\d+)" + total_size_re = r"Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P\d+)" total_records_match = re.search(total_records_re, logs, re.MULTILINE) total_size_match = re.search(total_size_re, logs, re.MULTILINE) @@ -891,7 +878,6 @@ def _get_hive_result_size(self, notebook, snippet): return total_records, total_size, msg - def _get_impala_result_size(self, notebook, snippet): total_records_match = None total_records, total_size, msg = None, None, None @@ -904,7 +890,7 @@ def _get_impala_result_size(self, notebook, snippet): fragment = self._get_impala_query_profile(server_url, query_id=query_id) total_records_re = \ - "Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P\d+)\).*?(Averaged Fragment F\d\d)" + r"Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P\d+)\).*?(Averaged Fragment F\d\d)" total_records_match = re.search(total_records_re, fragment, re.MULTILINE | re.DOTALL) if total_records_match: @@ -917,7 +903,6 @@ def _get_impala_result_size(self, notebook, snippet): return total_records, total_size, msg - def _get_impala_query_id(self, snippet): guid = None if 'result' in snippet and 'handle' in snippet['result'] and 'guid' in snippet['result']['handle']: @@ -929,7 +914,6 @@ def _get_impala_query_id(self, snippet): LOG.warning('Snippet does not contain a valid result handle, cannot extract Impala query ID.') return guid - def _get_impala_query_profile(self, server_url, query_id): api = get_impalad_api(user=self.user, url=server_url) @@ -944,18 +928,15 @@ def _get_impala_query_profile(self, server_url, query_id): return profile - def _get_impala_profile_plan(self, query_id, profile): - query_plan_re = "Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id} + query_plan_re = r"Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id} query_plan_match = re.search(query_plan_re, profile, re.MULTILINE | re.DOTALL) return query_plan_match.group() if query_plan_match else None - def describe_column(self, notebook, snippet, database=None, table=None, column=None): db = self._get_db(snippet, interpreter=self.interpreter) return db.get_table_columns_stats(database, table, column) - def describe_table(self, notebook, snippet, database=None, table=None): db = self._get_db(snippet, interpreter=self.interpreter) tb = db.get_table(database, table)