Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CopyObject: add test for encrypted objects #595

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions s3tests_boto3/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13915,3 +13915,175 @@ def test_post_object_upload_checksum():

r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400


#########################
# COPY ENCRYPTION TESTS #
#########################
_copy_enc_source_modes = {
'unencrypted': {
'marks': [pytest.mark.fails_on_aws],
},
'sse-s3': {
'args': {'ServerSideEncryption': 'AES256'},
'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
'marks': [pytest.mark.sse_s3],
},
'sse-c': {
'args': {
'SSECustomerAlgorithm': 'AES256',
'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
},
'source_copy_args': {
'CopySourceSSECustomerAlgorithm': 'AES256',
'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'CopySourceSSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
},
'assert': lambda r: (
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'DWygnHRtgiJ77HCm+1rvHw=='
)
},
'sse-kms': {
'args': {
'ServerSideEncryption': 'aws:kms',
'SSEKMSKeyId': lambda: get_main_kms_keyid()
},
'assert': lambda r: (
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_main_kms_keyid()
)
}
}
_copy_enc_dest_modes = {
'unencrypted': {
'marks': [pytest.mark.fails_on_aws],
},
'sse-s3': {
'args': {'ServerSideEncryption': 'AES256'},
'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
'marks': [pytest.mark.sse_s3],
},
'sse-c': {
'args': {
'SSECustomerAlgorithm': 'AES256',
'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
},
'get_args': {
'SSECustomerAlgorithm': 'AES256',
'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
},
'assert': lambda r: (
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'arxBvwY2V4SiOne6yppVPQ=='
)
},
'sse-kms': {
'args': {
'ServerSideEncryption': 'aws:kms',
'SSEKMSKeyId': lambda: get_secondary_kms_keyid()
},
'assert': lambda r: (
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_secondary_kms_keyid()
)
}
}

def _test_copy_enc(file_size, source_mode_key, dest_mode_key):
source_args = _copy_enc_source_modes[source_mode_key]
dest_args = _copy_enc_dest_modes[dest_mode_key]

bucket_name = get_new_bucket()
client = get_client()

# upload original file with source encryption
data = 'A'*file_size
args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()}
response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, **args)
assert source_args.get('assert', lambda r: True)(response)

# copy the object to a new key, with destination encryption
dest_bucket_name = get_new_bucket()
copy_args = {key: value() if callable(value) else value for key, value in dest_args.get('args', {}).items()}
copy_args.update(source_args.get('source_copy_args', {}))
response = client.copy_object(Bucket=dest_bucket_name, Key='testobj2', CopySource={'Bucket': bucket_name, 'Key': 'testobj'}, **copy_args)
assert dest_args.get('assert', lambda r: True)(response)

# verify the copy is encrypted
get_args = dest_args.get('get_args', {})
response = client.get_object(Bucket=dest_bucket_name, Key='testobj2', **get_args)
assert dest_args.get('assert', lambda r: True)(response)
body = _get_body(response)
assert body == data

@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
pytest.param(
source_key,
dest_key,
marks=[
*_copy_enc_source_modes[source_key].get('marks', []),
*_copy_enc_dest_modes[dest_key].get('marks', [])
]
)
for source_key in _copy_enc_source_modes.keys()
for dest_key in _copy_enc_dest_modes.keys()
])
def test_copy_enc_1b(source_mode_key, dest_mode_key):
_test_copy_enc(1, source_mode_key, dest_mode_key)

@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
pytest.param(
source_key,
dest_key,
marks=[
*_copy_enc_source_modes[source_key].get('marks', []),
*_copy_enc_dest_modes[dest_key].get('marks', [])
]
)
for source_key in _copy_enc_source_modes.keys()
for dest_key in _copy_enc_dest_modes.keys()
])
def test_copy_enc_1kb(source_mode_key, dest_mode_key):
_test_copy_enc(1024, source_mode_key, dest_mode_key)

@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
pytest.param(
source_key,
dest_key,
marks=[
*_copy_enc_source_modes[source_key].get('marks', []),
*_copy_enc_dest_modes[dest_key].get('marks', [])
]
)
for source_key in _copy_enc_source_modes.keys()
for dest_key in _copy_enc_dest_modes.keys()
])
def test_copy_enc_1mb(source_mode_key, dest_mode_key):
_test_copy_enc(1024*1024, source_mode_key, dest_mode_key)

@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
pytest.param(
source_key,
dest_key,
marks=[
*_copy_enc_source_modes[source_key].get('marks', []),
*_copy_enc_dest_modes[dest_key].get('marks', [])
]
)
for source_key in _copy_enc_source_modes.keys()
for dest_key in _copy_enc_dest_modes.keys()
])
def test_copy_enc_8mb(source_mode_key, dest_mode_key):
_test_copy_enc(8*1024*1024, source_mode_key, dest_mode_key)