Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pipeline Truncation & Depth Query String Support #8

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.pyc
.DS_Store
.idea
*.iml
92 changes: 92 additions & 0 deletions python/src/pipeline/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

"""Datastore models used by the Google App Engine Pipeline API."""

import unicodedata

from google.appengine.ext import db
from google.appengine.ext import ndb
from google.appengine.ext import blobstore

try:
Expand All @@ -27,6 +30,25 @@
# Relative imports
import util

def truncate_value(value):
"""Shorten the given value, preserving dict structure."""
if isinstance(value, dict):
return dict((k, truncate_value(v)) for k, v in value.iteritems())
elif isinstance(value, ndb.Key):
return value.urlsafe()
elif isinstance(value, db.Key):
return str(value.urlsafe)
elif isinstance(value, basestring):
# do not truncate string/urlsafe db keys.
# TODO: When do we create an ndb.Key vs. db.Key
try:
ndb.Key(urlsafe=value)
return value
except:
val = value.encode('utf8')
else:
val = str(value)
return val[:100]

class _PipelineRecord(db.Model):
"""Represents a Pipeline.
Expand Down Expand Up @@ -116,6 +138,46 @@ def params(self):
self._params_decoded = value
return self._params_decoded

def truncated_copy(self):
"""Create a lightweight copy of the pipeline with the args truncated."""
return _LowMemoryPipelineRecord(self)

@property
def root_pipeline_key(self):
"""Returns root pipeline key."""
return self.root_pipeline.get_value_for_datastore(self)

class _LowMemoryPipelineRecord(object):
"""Substitute for _PipelineRecord that takes up less space.

This class has most of the attributes of _PipelineRecord (the ones we need to
display the pipeline information in the UI), except that the pipeline
arguments are truncated to save space.
"""
def __init__(self, pipeline_record):
self.key_value = pipeline_record.key()
self.class_path = pipeline_record.class_path
# Skip root pipeline since ReferenceProperties are tricky.
self.fanned_out = pipeline_record.fanned_out
self.start_time = pipeline_record.start_time
self.finalized_time = pipeline_record.finalized_time
self.status = pipeline_record.status
self.current_attempt = pipeline_record.current_attempt
self.max_attempts = pipeline_record.max_attempts
self.next_retry_time = pipeline_record.next_retry_time
self.retry_message = pipeline_record.retry_message
self.is_root_pipeline = pipeline_record.is_root_pipeline
self.abort_message = pipeline_record.abort_message
self.abort_requested = pipeline_record.abort_requested
self.root_pipeline_key = \
_PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record)
self.params = pipeline_record.params
self.params['args'] = [truncate_value(v) for v in self.params['args']]
self.params['kwargs'] = truncate_value(self.params['kwargs'])

def key(self):
return self.key_value


class _SlotRecord(db.Model):
"""Represents an output slot.
Expand Down Expand Up @@ -166,6 +228,36 @@ def value(self):
self._value_decoded = json.loads(encoded_value, cls=util.JsonDecoder)
return self._value_decoded

@property
def filler_pipeline_key(self):
return _SlotRecord.filler.get_value_for_datastore(self)

def truncated_copy(self):
"""Return a lightweight copy of the slot with the value truncated."""
return _LowMemorySlotRecord(self)


class _LowMemorySlotRecord(object):
"""Substitute for _SlotRecord that takes up less space.

This class has most of the attributes of _SlotRecord (the ones we need to
display the slot information in the UI), except that the slot value itself
(if it exists) is truncated.
"""
def __init__(self, slot_record):
self.key_value = slot_record.key()
# Skip root pipeline since ReferenceProperties are tricky.
self.filler_pipeline_key = slot_record.filler_pipeline_key
self.status = slot_record.status
# Getting the "value" property crashes if the slot isn't filled, so we need
# this check.
if self.status == _SlotRecord.FILLED:
self.value = truncate_value(slot_record.value)
self.fill_time = slot_record.fill_time

def key(self):
return self.key_value


class _BarrierRecord(db.Model):
"""Represents a barrier.
Expand Down
Loading