-
Notifications
You must be signed in to change notification settings - Fork 165
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
208 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2019 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""File archiving processor""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
import os | ||
import tarfile | ||
import logging | ||
|
||
from time import time | ||
from turbinia import TurbiniaException | ||
|
||
log = logging.getLogger('turbinia') | ||
|
||
|
||
def ValidateTarFile(compressed_directory): | ||
""" Validates a given compressed directory path. | ||
Performs a check to determine if the path exists and if | ||
the file extension is in the list of accepted extensions. | ||
Args: | ||
compressed_directory(str): The path to the compressed tar file. | ||
Raises: | ||
TurbiniaException: If validation fails. | ||
""" | ||
if not os.path.exists(compressed_directory): | ||
raise TurbiniaException( | ||
'The File or Directory does not exist: {0:s}'.format( | ||
compressed_directory)) | ||
|
||
# TODO(wyassine): rewrite this check so it is not dependant | ||
# on a list of hard coded extensions and instead have a | ||
# check to determine whether or not it is a tar file format. | ||
if not (compressed_directory.endswith('.tgz') or | ||
compressed_directory.endswith('.tar.gz')): | ||
raise TurbiniaException( | ||
'The file is not a supported format. The list of ' | ||
'acceptable exensions are: .tgz or .tar.gz') | ||
|
||
|
||
def CompressDirectory(uncompressed_directory): | ||
"""Compress a given directory into a tar file. | ||
Args: | ||
uncompressed_directory(str): The path to the uncompressed directory. | ||
Returns: | ||
str: The path to the tar file. | ||
""" | ||
# Error handling check for a non-existent file or directory. | ||
if not os.path.exists(uncompressed_directory): | ||
raise TurbiniaException( | ||
'The File or Directory does not exist: {0:s}'.format( | ||
uncompressed_directory)) | ||
|
||
# Iterate through a given list of files and compress them. | ||
compressed_directory = uncompressed_directory + '.tar.gz' | ||
try: | ||
with tarfile.TarFile.open(compressed_directory, 'w:gz') as tar: | ||
tar.add(uncompressed_directory, arcname='') | ||
tar.close() | ||
log.info( | ||
'The tar file has been created and ' | ||
'can be found at: {0:s}'.format(compressed_directory)) | ||
except IOError as exception: | ||
raise TurbiniaException('An error has occurred: {0:s}'.format(exception)) | ||
except tarfile.TarError as exception: | ||
raise TurbiniaException( | ||
'An error has while compressing the directory: {0:s}'.format(exception)) | ||
return compressed_directory | ||
|
||
|
||
def UncompressTarFile(compressed_directory, output_tmp): | ||
"""Uncompress a provided tar file. | ||
Args: | ||
compressed_directory(str): The path to the tar file. | ||
output_tmp(str): The path to the temporary directory that the | ||
uncompressed tar file will be placed into. | ||
Returns: | ||
str: The path to the uncompressed directory. | ||
""" | ||
# Tar file validation check | ||
ValidateTarFile(compressed_directory) | ||
|
||
# Generate the uncompressed directory path | ||
uncompressed_file = 'uncompressed-' + str(int(time())) | ||
uncompressed_directory = os.path.join(output_tmp, uncompressed_file) | ||
|
||
# Uncompress the tar file into the uncompressed directory. | ||
try: | ||
tar = tarfile.TarFile.open(compressed_directory) | ||
tar.extractall(path=uncompressed_directory) | ||
tar.close() | ||
log.info( | ||
'The tar file has been uncompressed to the following directory: {0:s}' | ||
.format(uncompressed_directory)) | ||
except IOError as exception: | ||
raise TurbiniaException('An error has occurred: {0:s}'.format(exception)) | ||
except tarfile.TarError as exception: | ||
raise TurbiniaException( | ||
'An error has occurred while uncompressing the tar ' | ||
'file: {0:s}'.format(exception)) | ||
return uncompressed_directory |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2019 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Tests for the Archive processor to compress and decompress folders.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
import os | ||
import tarfile | ||
import unittest | ||
import tempfile | ||
|
||
from random import randint | ||
from shutil import rmtree | ||
from turbinia.processors import archive | ||
from turbinia import TurbiniaException | ||
|
||
|
||
class ArchiveProcessorTest(unittest.TestCase): | ||
"""Tests for Archive Processor.""" | ||
|
||
def setUp(self): | ||
# Setup testing directories/variables. | ||
self.test_files = [] | ||
self.base_output_dir = tempfile.mkdtemp(prefix='turbinia-test-local') | ||
self.tmp_files_dir = os.path.join(self.base_output_dir, 'files') | ||
self.tmp_archive = os.path.join(self.base_output_dir, 'files.tar.gz') | ||
if not os.path.exists(self.tmp_files_dir): | ||
os.makedirs(self.tmp_files_dir) | ||
|
||
# Generate text files containing random numbers. | ||
file_max = 10 | ||
counter = 0 | ||
while counter <= file_max: | ||
file_name = 'file{0:s}.txt'.format(str(counter)) | ||
file_path = os.path.join(self.tmp_files_dir, file_name) | ||
file_open = open(file_path, 'w+') | ||
rand_nums = [randint(0, 1000) for i in range(50)] | ||
for i in rand_nums: | ||
file_open.write('%s\n' % str(i)) | ||
file_open.close() | ||
counter += 1 | ||
self.test_files.append(file_name) | ||
archive.CompressDirectory(self.tmp_files_dir) | ||
|
||
def tearDown(self): | ||
# Remove testing directory for this unit test. | ||
if os.path.exists(self.base_output_dir): | ||
rmtree(self.base_output_dir) | ||
|
||
def test_compressed_dir(self): | ||
"""Tests the compression function""" | ||
# Check if compressed directory matches expected output path. | ||
self.assertEqual( | ||
archive.CompressDirectory(self.tmp_files_dir), self.tmp_archive) | ||
|
||
# Check to confirm that the archive is gzip format. | ||
self.assertEqual(tarfile.is_tarfile(self.tmp_archive), True) | ||
|
||
# Raise assertion if folder does not exist. | ||
with self.assertRaises(TurbiniaException): | ||
archive.CompressDirectory('blah') | ||
|
||
def test_validate_tarfile(self): | ||
"""Tests the validate function used to decompress tar files""" | ||
|
||
# Raise exception for file that does not exist. | ||
with self.assertRaises(TurbiniaException): | ||
archive.ValidateTarFile('blah.no') | ||
|
||
# Raise exception for a file with unsupported extension. | ||
with self.assertRaises(TurbiniaException): | ||
archive.ValidateTarFile(self.tmp_files_dir) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |