Skip to content

Commit

Permalink
[computes] fixed session-type extraction for connectors
Browse files Browse the repository at this point in the history
The problem is that connector based query execution is not able to
reuse session to fetch results. The frontend is sending the correct
session_id but our session fetching logic got broken when the computes
was implemented. we are now looking for the session_type from
compute['name'] for computes, connector['name'] for connector and
then snippets['type'] for old config file based hive/impala sessions.

A related change is to make use of session for get_log and check_status
calls if the frontend is sending it.

Rest is some ruff and other refactoring.
  • Loading branch information
amitsrivastava committed May 22, 2024
1 parent 9e27d5f commit f292cb3
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 244 deletions.
40 changes: 27 additions & 13 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
"""
from __future__ import print_function

import numbers
import re
import time
import numbers

from django import forms

from beeswax.models import Namespace, Compute
from beeswax.models import Compute, Namespace

HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")
HIVE_IDENTIFER_REGEX = re.compile(r"(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")

DL_FORMATS = ['csv', 'xls']

Expand Down Expand Up @@ -56,12 +56,13 @@
(' ', "Space", 32),
]


def timing(fn):
def decorator(*args, **kwargs):
time1 = time.time()
ret = fn(*args, **kwargs)
time2 = time.time()
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2-time1)*1000.0))
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2 - time1) * 1000.0))
return ret
return decorator

Expand All @@ -79,7 +80,8 @@ def apply_natural_sort(collection, key=None):
Applies a natural sort (http://rosettacode.org/wiki/Natural_sorting) to a list or dictionary
Dictionary types require a sort key to be specified
"""
to_digit = lambda i: int(i) if i.isdigit() else i
def to_digit(i):
return int(i) if i.isdigit() else i

def tokenize_and_convert(item, key=None):
if key:
Expand All @@ -89,13 +91,26 @@ def tokenize_and_convert(item, key=None):
return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key))


def is_compute(cluster):
def find_compute_in_cluster(cluster):
if not cluster:
return False
return None
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)

def _compute_check(x):
return x and x.get('type') in COMPUTE_TYPES

return (
cluster if _compute_check(cluster)
else compute if _compute_check(compute)
else connector if _compute_check(connector) else None)


def extract_session_type(snippet):
compute = find_compute_in_cluster(snippet)
if compute and compute.get('name'):
return compute['name']
return snippet.get('type') if snippet else None


'''
Expand All @@ -107,17 +122,16 @@ def is_compute(cluster):
3. Lookup namespace based on dialect from cluster or prpvided dialect
and return the first compute filtered by user-access. Needs valid user
'''


def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
if cluster:
# If we find a full/partial cluster object, we will attempt to load a compute
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
else compute if compute_check(compute)
else connector if compute_check(connector) else None)
selected_compute = find_compute_in_cluster(cluster)

# If found, we will attempt to reload it, first by id then by name
if selected_compute:
Expand Down
Loading

0 comments on commit f292cb3

Please sign in to comment.