Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot abstraction #1603

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions tests/test_repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tuf.roledb
import tuf.keydb
import tuf.settings
import tuf.snapshot

import tuf.repository_lib as repo_lib
import tuf.repository_tool as repo_tool
Expand Down Expand Up @@ -499,25 +500,27 @@ def test_generate_snapshot_metadata(self):
metadata_directory, version, expiration_date, storage_backend = \
self._setup_generate_snapshot_metadata_test()

snapshot_backend = tuf.snapshot.ManifestSnapshot()

snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
snapshot_backend.generate_snapshot_metadata(metadata_directory, version,
expiration_date,
storage_backend,
consistent_snapshot=False)
self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata))


# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
self.assertRaises(securesystemslib.exceptions.FormatError, snapshot_backend.generate_snapshot_metadata,
3, version, expiration_date, consistent_snapshot=False,
storage_backend=storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
self.assertRaises(securesystemslib.exceptions.FormatError, snapshot_backend.generate_snapshot_metadata,
metadata_directory, '3', expiration_date, storage_backend,
consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
self.assertRaises(securesystemslib.exceptions.FormatError, snapshot_backend.generate_snapshot_metadata,
metadata_directory, version, '3', storage_backend,
consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
self.assertRaises(securesystemslib.exceptions.FormatError, snapshot_backend.generate_snapshot_metadata,
metadata_directory, version, expiration_date, 3,
storage_backend)

Expand All @@ -527,8 +530,10 @@ def test_generate_snapshot_metadata_with_length(self):
metadata_directory, version, expiration_date, storage_backend = \
self._setup_generate_snapshot_metadata_test()

snapshot_backend = tuf.snapshot.ManifestSnapshot()

snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
snapshot_backend.generate_snapshot_metadata(metadata_directory, version,
expiration_date,
storage_backend,
consistent_snapshot=False,
Expand All @@ -541,7 +546,7 @@ def test_generate_snapshot_metadata_with_length(self):
# In the metadata_directory, there are files with format:
# 1.root.json. The prefix number should be removed.
stripped_filename, version = \
repo_lib._strip_version_number(metadata_filename,
repo_lib.strip_version_number(metadata_filename,
consistent_snapshot=True)

# In the repository, the file "role_file.xml" have been added to make
Expand All @@ -558,8 +563,10 @@ def test_generate_snapshot_metadata_with_hashes(self):
metadata_directory, version, expiration_date, storage_backend = \
self._setup_generate_snapshot_metadata_test()

snapshot_backend = tuf.snapshot.ManifestSnapshot()

snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
snapshot_backend.generate_snapshot_metadata(metadata_directory, version,
expiration_date,
storage_backend,
consistent_snapshot=False,
Expand All @@ -572,7 +579,7 @@ def test_generate_snapshot_metadata_with_hashes(self):
# In the metadata_directory, there are files with format:
# 1.root.json. The prefix number should be removed.
stripped_filename, version = \
repo_lib._strip_version_number(metadata_filename,
repo_lib.strip_version_number(metadata_filename,
consistent_snapshot=True)

# In the repository, the file "role_file.xml" have been added to make
Expand All @@ -589,8 +596,10 @@ def test_generate_snapshot_metadata_with_hashes_and_length(self):
metadata_directory, version, expiration_date, storage_backend = \
self._setup_generate_snapshot_metadata_test()

snapshot_backend = tuf.snapshot.ManifestSnapshot()

snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
snapshot_backend.generate_snapshot_metadata(metadata_directory, version,
expiration_date,
storage_backend,
consistent_snapshot=False,
Expand All @@ -604,7 +613,7 @@ def test_generate_snapshot_metadata_with_hashes_and_length(self):
# In the metadata_directory, there are files with format:
# 1.root.json. The prefix number should be removed.
stripped_filename, version = \
repo_lib._strip_version_number(metadata_filename,
repo_lib.strip_version_number(metadata_filename,
consistent_snapshot=True)

# In the repository, the file "role_file.xml" have been added to make
Expand Down
169 changes: 10 additions & 159 deletions tuf/repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from tuf import roledb
from tuf import settings
from tuf import sig
from tuf import snapshot


# See 'log.py' to learn how logging is handled in TUF.
Expand Down Expand Up @@ -90,7 +91,7 @@ def _generate_and_write_metadata(rolename, metadata_filename,
increment_version_number=True, repository_name='default',
use_existing_fileinfo=False, use_timestamp_length=True,
use_timestamp_hashes=True, use_snapshot_length=False,
use_snapshot_hashes=False):
use_snapshot_hashes=False, snapshot_backend=None):
"""
Non-public function that can generate and write the metadata for the
specified 'rolename'. It also increments the version number of 'rolename' if
Expand All @@ -117,7 +118,9 @@ def _generate_and_write_metadata(rolename, metadata_filename,


elif rolename == 'snapshot':
metadata = generate_snapshot_metadata(metadata_directory,
if snapshot_backend == None:
snapshot_backend = snapshot.ManifestSnapshot()
metadata = snapshot_backend.generate_snapshot_metadata(metadata_directory,
roleinfo['version'], roleinfo['expires'],
storage_backend, consistent_snapshot, repository_name,
use_length=use_snapshot_length, use_hashes=use_snapshot_hashes)
Expand Down Expand Up @@ -393,7 +396,7 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
# have a prepended version number even though the repository is now
# a non-consistent one.
if metadata_role not in snapshot_metadata['meta']:
metadata_role, junk = _strip_version_number(metadata_role,
metadata_role, junk = strip_version_number(metadata_role,
consistent_snapshot)

else:
Expand Down Expand Up @@ -442,7 +445,7 @@ def _get_written_metadata(metadata_signable):



def _strip_version_number(metadata_filename, consistent_snapshot):
def strip_version_number(metadata_filename, consistent_snapshot):
"""
Strip from 'metadata_filename' any version number (in the
expected '{dirname}/<version_number>.rolename.<ext>' format) that
Expand Down Expand Up @@ -851,7 +854,7 @@ def get_delegated_roles_metadata_filenames(metadata_directory,
# Example: '10.django.json' --> 'django.json'
consistent = \
metadata_role.endswith('root.json') or consistent_snapshot == True
metadata_name, junk = _strip_version_number(metadata_role,
metadata_name, junk = strip_version_number(metadata_role,
consistent)

if metadata_name.endswith(METADATA_EXTENSION):
Expand Down Expand Up @@ -1519,7 +1522,7 @@ def _generate_targets_fileinfo(target_files, targets_directory,



def _get_hashes_and_length_if_needed(use_length, use_hashes, full_file_path,
def get_hashes_and_length_if_needed(use_length, use_hashes, full_file_path,
storage_backend):
"""
Calculate length and hashes only if they are required,
Expand All @@ -1541,158 +1544,6 @@ def _get_hashes_and_length_if_needed(use_length, use_hashes, full_file_path,



def generate_snapshot_metadata(metadata_directory, version, expiration_date,
storage_backend, consistent_snapshot=False,
repository_name='default', use_length=False, use_hashes=False):
"""
<Purpose>
Create the snapshot metadata. The minimum metadata must exist (i.e.,
'root.json' and 'targets.json'). This function searches
'metadata_directory' and the resulting snapshot file will list all the
delegated roles found there.

<Arguments>
metadata_directory:
The directory containing the 'root.json' and 'targets.json' metadata
files.

version:
The metadata version number. Clients use the version number to
determine if the downloaded version is newer than the one currently
trusted.

expiration_date:
The expiration date of the metadata file.
Conformant to 'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'.

storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface.

consistent_snapshot:
Boolean. If True, a file digest is expected to be prepended to the
filename of any target file located in the targets directory. Each digest
is stripped from the target filename and listed in the snapshot metadata.

repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.

use_length:
Whether to include the optional length attribute for targets
metadata files in the snapshot metadata.
Default is False to save bandwidth but without losing security
from rollback attacks.
Read more at section 5.6 from the Mercury paper:
https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy

use_hashes:
Whether to include the optional hashes attribute for targets
metadata files in the snapshot metadata.
Default is False to save bandwidth but without losing security
from rollback attacks.
Read more at section 5.6 from the Mercury paper:
https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy

<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.

securesystemslib.exceptions.Error, if an error occurred trying to generate
the snapshot metadata object.

<Side Effects>
The 'root.json' and 'targets.json' files are read.

<Returns>
The snapshot metadata object, conformant to 'tuf.formats.SNAPSHOT_SCHEMA'.
"""

# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
sslib_formats.PATH_SCHEMA.check_match(metadata_directory)
formats.METADATAVERSION_SCHEMA.check_match(version)
sslib_formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
sslib_formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
sslib_formats.NAME_SCHEMA.check_match(repository_name)
sslib_formats.BOOLEAN_SCHEMA.check_match(use_length)
sslib_formats.BOOLEAN_SCHEMA.check_match(use_hashes)

# Snapshot's 'fileinfodict' shall contain the version number of Root,
# Targets, and all delegated roles of the repository.
fileinfodict = {}

length, hashes = _get_hashes_and_length_if_needed(use_length, use_hashes,
os.path.join(metadata_directory, TARGETS_FILENAME), storage_backend)

targets_role = TARGETS_FILENAME[:-len(METADATA_EXTENSION)]

targets_file_version = get_metadata_versioninfo(targets_role,
repository_name)

# Make file info dictionary with make_metadata_fileinfo because
# in the tuf spec length and hashes are optional for all
# METAFILES in snapshot.json including the top-level targets file.
fileinfodict[TARGETS_FILENAME] = formats.make_metadata_fileinfo(
targets_file_version['version'], length, hashes)

# Search the metadata directory and generate the versioninfo of all the role
# files found there. This information is stored in the 'meta' field of
# 'snapshot.json'.

metadata_files = sorted(storage_backend.list_folder(metadata_directory),
reverse=True)
for metadata_filename in metadata_files:
# Strip the version number if 'consistent_snapshot' is True.
# Example: '10.django.json' --> 'django.json'
metadata_name, junk = _strip_version_number(metadata_filename,
consistent_snapshot)

# All delegated roles are added to the snapshot file.
if metadata_filename.endswith(METADATA_EXTENSION):
rolename = metadata_filename[:-len(METADATA_EXTENSION)]

# Obsolete role files may still be found. Ensure only roles loaded
# in the roledb are included in the Snapshot metadata. Since the
# snapshot and timestamp roles are not listed in snapshot.json, do not
# list these roles found in the metadata directory.
if roledb.role_exists(rolename, repository_name) and \
rolename not in roledb.TOP_LEVEL_ROLES:

length, hashes = _get_hashes_and_length_if_needed(use_length, use_hashes,
os.path.join(metadata_directory, metadata_filename), storage_backend)

file_version = get_metadata_versioninfo(rolename,
repository_name)

fileinfodict[metadata_name] = formats.make_metadata_fileinfo(
file_version['version'], length, hashes)

else:
logger.debug('Metadata file has an unsupported file'
' extension: ' + metadata_filename)

# Generate the Snapshot metadata object.
# Use generalized build_dict_conforming_to_schema func to produce a dict that
# contains all the appropriate information for snapshot metadata,
# checking that the result conforms to the appropriate schema.
# TODO: Later, probably after the rewrite for TUF Issue #660, generalize
# further, upward, by replacing generate_targets_metadata,
# generate_root_metadata, etc. with one function that generates
# metadata, possibly rolling that upwards into the calling function.
# There are very few things that really need to be done differently.
return formats.build_dict_conforming_to_schema(
formats.SNAPSHOT_SCHEMA,
version=version,
expires=expiration_date,
meta=fileinfodict)






def generate_timestamp_metadata(snapshot_file_path, version, expiration_date,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this stays here? That kind of makes sense, but is a little asymmetric.

storage_backend, repository_name, use_length=True, use_hashes=True):
Expand Down Expand Up @@ -1758,7 +1609,7 @@ def generate_timestamp_metadata(snapshot_file_path, version, expiration_date,

snapshot_fileinfo = {}

length, hashes = _get_hashes_and_length_if_needed(use_length, use_hashes,
length, hashes = get_hashes_and_length_if_needed(use_length, use_hashes,
snapshot_file_path, storage_backend)

snapshot_filename = os.path.basename(snapshot_file_path)
Expand Down
Loading