-
-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathtest_commands_eyaml_rotate_keys.py
320 lines (288 loc) · 14.6 KB
/
test_commands_eyaml_rotate_keys.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import pytest
from tests.conftest import (
create_temp_yaml_file,
requireseyaml,
old_eyaml_keys,
new_eyaml_keys,
)
class Test_eyaml_rotate_keys():
"""Tests for the eyaml-rotate-keys command-line interface."""
command = "eyaml-rotate-keys"
def test_no_options(self, script_runner):
result = script_runner.run([self.command])
assert not result.success, result.stderr
assert "usage: {}".format(self.command) in result.stderr
def test_duplicate_keys(self, script_runner):
bunk_key = "/does/not/exist/on-most/systems"
result = script_runner.run([
self.command,
"--newprivatekey={}".format(bunk_key),
"--newpublickey={}".format(bunk_key),
"--oldprivatekey={}".format(bunk_key),
"--oldpublickey={}".format(bunk_key),
bunk_key
])
assert not result.success, result.stderr
assert "The new and old EYAML keys must be different." in result.stderr
def test_bad_keys(self, script_runner):
bunk_file = "/does/not/exist/on-most/systems"
bunk_old_key = "/does/not/exist/on-most/systems/old"
bunk_new_key = "/does/not/exist/on-most/systems/new"
result = script_runner.run([
self.command,
"--newprivatekey={}".format(bunk_new_key),
"--newpublickey={}".format(bunk_new_key),
"--oldprivatekey={}".format(bunk_old_key),
"--oldpublickey={}".format(bunk_old_key),
bunk_file
])
assert not result.success, result.stderr
assert "EYAML key is not a readable file:" in result.stderr
@requireseyaml
def test_no_yaml_files(self, script_runner, old_eyaml_keys, new_eyaml_keys):
bunk_file = "/does/not/exist/on-most/systems"
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
bunk_file
])
assert not result.success, result.stderr
assert "Not a file:" in result.stderr
@requireseyaml
def test_good_multi_replacements(self, script_runner, tmp_path_factory, old_eyaml_keys, new_eyaml_keys, quiet_logger):
from yamlpath.func import unwrap_node_coords
from yamlpath.common import Parsers
from yamlpath import Processor
from yamlpath.eyaml import EYAMLProcessor
simple_content = """---
encrypted_string: ENC[PKCS7,MIIBiQYJKoZIhvcNAQcDoIIBejCCAXYCAQAxggEhMIIBHQIBADAFMAACAQEwDQYJKoZIhvcNAQEBBQAEggEAHA4rPcTzvgzPLtnGz3yoyX/kVlQ5TnPXcScXK2bwjguGZLkuzv/JVPAsOm4t6GlnROpy4zb/lUMHRJDChJhPLrSj919B8//huoMgw0EU5XTcaN6jeDDjL+vhjswjvLFOux66UwvMo8sRci/e2tlFiam8VgxzV0hpF2qRrL/l84V04gL45kq4PCYDWrJNynOwYVbSIF+qc5HaF25H8kHq1lD3RB6Ob/J942Q7k5Qt7W9mNm9cKZmxwgtUgIZWXW6mcPJ2dXDB/RuPJJSrLsb1VU/DkhdgxaNzvLCA+MViyoFUkCfHFNZbaHKNkoYXBy7dLmoh/E5tKv99FeG/7CzL3DBMBgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBCVU5Mjt8+4dLkoqB9YArfkgCDkdIhXR9T1M4YYa1qTE6by61VPU3g1aMExRmo4tNZ8FQ==]
encrypted_block: >
ENC[PKCS7,MIIBeQYJKoZIhvcNAQcDoIIBajCCAWYCAQAxggEhMIIBHQIBADAFMAACAQEw
DQYJKoZIhvcNAQEBBQAEggEAnxQVqyIgRTb/+VP4Q+DLJcnlS8YPouXEW8+z
it9uwUA02CEPxCEU944GcHpgTY3EEtkm+2Z/jgXI119VMML+OOQ1NkwUiAw/
wq0vwz2D16X31XzhedQN5FZbfZ1C+2tWSQfCjE0bu7IeHfyR+k2ssD11kNZh
JDEr2bM2dwOdT0y7VGcQ06vI9gw6UXcwYAgS6FoLm7WmFftjcYiNB+0EJSW0
VcTn2gveaw9iOQcum/Grby+9Ybs28fWd8BoU+ZWDpoIMEceujNa9okIXNPJO
jcvv1sgauwJ3RX6WFQIy/beS2RT5EOLhWIZCAQCcgJWgovu3maB7dEUZ0NLG
OYUR7zA8BgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBAbO16EzQ5/cdcvgB0g
tpKIgBAEgTLT5n9Jtc9venK0CKso]
"""
anchored_content = """---
aliases:
- &blockStyle >
ENC[PKCS7,MIIBiQYJKoZIhvcNAQcDoIIBejCCAXYCAQAxggEhMIIBHQIBADAFMAACAQEw
DQYJKoZIhvcNAQEBBQAEggEArvk6OYa1gACTdrWq2SpCrtGRlc61la5AGU7L
aLTyKfqD9vqx71RDjobfOF96No07kLsEpoAJ+LKKHNjdG6kjvpGPmttj9Dkm
XVoU6A+YCmm4iYFKD/NkoSOEyAkoDOXSqdjrgt0f37GefEsXt6cqAavDpUJm
pmc0KI4TCG5zpfCxqttMs+stOY3Y+0WokkulQujZ7K3SdWUSHIysgMrWiect
Wdg5unxN1A/aeyvhgvYSNPjU9KBco7SDnigSs9InW/QghJFrZRrDhTp1oTUc
qK5lKvaseHkVGi91vPWeLQxZt1loJB5zL6j5BxMbvRfJK+wc3ax2u4x8WTAB
EurCwzBMBgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBAwcy7jvcOGcMfLEtug
LEXbgCBkocdckuDe14mVGmUmM++xN34OEVRCeGVWWUnWq1DJ4Q==]
- &stringStyle ENC[PKCS7,MIIBiQYJKoZIhvcNAQcDoIIBejCCAXYCAQAxggEhMIIBHQIBADAFMAACAQEwDQYJKoZIhvcNAQEBBQAEggEAIu44u62q5sVfzC7kytLi2Z/EzH2DKr4vDsoqDBeSZ71aRku/uSrjyiO4lyoq9Kva+eBAyjBay5fnqPVBaU3Rud2pdEoZEoyofi02jn4hxUKpAO1W0AUgsQolGe53qOdM4U8RbwnTR0gr3gp2mCd18pH3SRMP9ryrsBAxGzJ6mR3RgdZnlTlqVGXCeWUeVpbH+lcHw3uvd+o/xkvJ/3ypxz+rWILiAZ3QlCirzn/qb2fHuKf3VBh8RVFuQDaM5voajZlgjD6KzNCsbATOqOA6eJI4j0ngPdDlIjGHAnahuyluQ5f5SIaIjLC+ZeCOfIYni0MQ+BHO0JNbccjq2Unb7TBMBgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBCYmAI0Ao3Ok1cSmVw0SgQGgCBK62z1r5RfRjf1xKfqDxTsGUHfsUmM3EjGJfnWzCRvuQ==]
block: *blockStyle
string: *stringStyle
yet_another:
'more.complex.child': *blockStyle
"""
simple_file = create_temp_yaml_file(tmp_path_factory, simple_content)
anchored_file = create_temp_yaml_file(tmp_path_factory, anchored_content)
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
simple_file,
anchored_file
])
assert result.success, result.stderr
with open(simple_file, 'r') as fhnd:
simple_data = fhnd.read()
with open(anchored_file, 'r') as fhnd:
anchored_data = fhnd.read()
# print("=== DEBUG: START anchored_data ===")
# print(anchored_data)
# print("=== DEBUG: STOP anchored_data ===")
assert not simple_data == simple_content
assert not anchored_data == anchored_content
# Verify that block and string formatting is correct
yaml = Parsers.get_yaml_editor()
(yaml_rotated_data, doc_loaded) = Parsers.get_yaml_data(
yaml, quiet_logger,
anchored_data, literal=True)
if not doc_loaded:
# An error message has already been logged
assert False, "Rotated anchored data failed to load (verify formatting)"
source_processor = Processor(quiet_logger, yaml_rotated_data)
for node in source_processor.get_nodes('/block', mustexist=True):
assert not ' ' in unwrap_node_coords(node)
# Test that the pre- and post-rotated values are identical
(yaml_anchored_data, doc_loaded) = Parsers.get_yaml_data(
yaml, quiet_logger,
anchored_content, literal=True)
if not doc_loaded:
# An error message has already been logged
assert False, "Original anchored data failed to load (verify values)"
(yaml_rotated_data, doc_loaded) = Parsers.get_yaml_data(
yaml, quiet_logger,
anchored_data, literal=True)
if not doc_loaded:
# An error message has already been logged
assert False, "Rotated anchored data failed to load (verify values)"
source_processor = EYAMLProcessor(
quiet_logger, yaml_anchored_data,
privatekey=old_eyaml_keys[0],
publickey=old_eyaml_keys[1])
for node in source_processor.get_eyaml_values(
'/block', True
):
assert unwrap_node_coords(node) == 'This is a test value.'
rotated_processor = EYAMLProcessor(
quiet_logger, yaml_rotated_data,
privatekey=new_eyaml_keys[0],
publickey=new_eyaml_keys[1])
for node in rotated_processor.get_eyaml_values(
'/block', True
):
assert unwrap_node_coords(node) == 'This is a test value.'
def test_yaml_parsing_error(self, script_runner, imparsible_yaml_file, old_eyaml_keys, new_eyaml_keys):
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
imparsible_yaml_file
])
assert not result.success, result.stderr
assert "YAML parsing error" in result.stderr
def test_yaml_syntax_error(self, script_runner, badsyntax_yaml_file, old_eyaml_keys, new_eyaml_keys):
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
badsyntax_yaml_file
])
assert not result.success, result.stderr
assert "YAML syntax error" in result.stderr
def test_yaml_composition_error(self, script_runner, badcmp_yaml_file, old_eyaml_keys, new_eyaml_keys):
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
badcmp_yaml_file
])
assert not result.success, result.stderr
assert "YAML composition error" in result.stderr
def test_corrupted_eyaml_value(self, script_runner, tmp_path_factory, old_eyaml_keys, new_eyaml_keys):
content = """---
key: >
ENC[PKCS7,MII ... corrupted-value ...
DBAEqBBAwcy7jvcOGcMfLEtugGVWWUnWq1DJ4Q==]
"""
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
yaml_file
])
assert not result.success, result.stderr
assert "Unable to decrypt value!" in result.stderr
def test_bad_recryption_key(self, script_runner, tmp_path_factory, old_eyaml_keys, new_eyaml_keys):
content = """---
key: >
ENC[PKCS7,MIIBmQYJKoZIhvcNAQcDoIIBijCCAYYCAQAxggEhMIIBHQIBADAFMAACAQEw
DQYJKoZIhvcNAQEBBQAEggEAs+8byhxSVFkzPAfGFazxsifEJQO3RH5MNf2g
o/x0oh+y1SB6bwB/lPtCBnCwDgKUKR8VzqWM8sTYkLTkSWq5BxS+Hix0zL1u
zqdzNbuFDNS3PoUM4XaBRPOhGL/xUGc8EuUmdc3RaGRqisZvqACAMDDMme5m
sCJVHw/QC//hAH6zrPmPA8D5S6ibMHGURifqTmLvi1BxxzMIWXWBmRpadAaq
nYqhYsI/IWyQBmF7OAwsREREu+qEiDDBOS5IchDcDnlxtoooB5xin4HDS9ED
MJMlKfpB1FCNtrC4RJz4uqFuwvX482cct3TtS+/UrPLP7rm6EILs7QSQGsdM
G+8k8DBcBgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBD+Bx88oA/j57i5UB5U
BHEogDDpFaKbtiGSTxOK44MpjLOGCZ4ME6lJz5EYVJQ3VJw95z98mvj6CgzL
NI/TSIF7M9U=]
"""
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[1]),
"--newpublickey={}".format(new_eyaml_keys[0]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
yaml_file
])
assert not result.success, result.stderr
assert "unable to encrypt" in result.stderr or "cannot be run due to exit code: 1" in result.stderr
def test_backup_file(self, script_runner, tmp_path_factory, old_eyaml_keys, new_eyaml_keys):
import os
content = """---
key: >
ENC[PKCS7,MIIBiQYJKoZIhvcNAQcDoIIBejCCAXYCAQAxggEhMIIBHQIBADAFMAACAQEw
DQYJKoZIhvcNAQEBBQAEggEAPGA1g1Wx50RK8F/Y118w1VT/SnCa7PMfN2OM
d82vGeWXm6INmoURMDWEvBUEFCmGZoOMLVlK3LALtUcPEW1N9ztJTypBrqqI
1K8L9aZWRNFt7uwsaoHWvk1XjMujP+nn2ZO3OiFYkiWFh0PcFw7cT1TmexB4
cNbBtNi7oJ88L17/8rbtJW465cWyj0pPCmwo3OvK39JcuJ2xosujNk4u5AUf
TjWwklk3yjPvjG6AvoS4TK+vkmqUcCkyy0tLZR8Xu+3IzYCq+DYH4QBrrrZf
pKer9VawzMzxgVXeCgKGEsa3XeSzWtgbyoZVtoBdl3uv2f8rGi5qAlwZ9syO
Aold9zBMBgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBDGUmDGJfp2Iqn7bATf
r0H9gCBNamGg9iiM92wGcVSkNmGJtVk8yEe3EOVn/QNzQ6v0fw==]
"""
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
backup_file = yaml_file + ".bak"
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
"--backup",
yaml_file
])
assert result.success, result.stderr
assert os.path.isfile(backup_file)
with open(backup_file, 'r') as fhnd:
filedat = fhnd.read()
assert filedat == content
def test_replace_backup_file(self, script_runner, tmp_path_factory, old_eyaml_keys, new_eyaml_keys):
import os
content = """---
key: >
ENC[PKCS7,MIIBiQYJKoZIhvcNAQcDoIIBejCCAXYCAQAxggEhMIIBHQIBADAFMAACAQEw
DQYJKoZIhvcNAQEBBQAEggEAPGA1g1Wx50RK8F/Y118w1VT/SnCa7PMfN2OM
d82vGeWXm6INmoURMDWEvBUEFCmGZoOMLVlK3LALtUcPEW1N9ztJTypBrqqI
1K8L9aZWRNFt7uwsaoHWvk1XjMujP+nn2ZO3OiFYkiWFh0PcFw7cT1TmexB4
cNbBtNi7oJ88L17/8rbtJW465cWyj0pPCmwo3OvK39JcuJ2xosujNk4u5AUf
TjWwklk3yjPvjG6AvoS4TK+vkmqUcCkyy0tLZR8Xu+3IzYCq+DYH4QBrrrZf
pKer9VawzMzxgVXeCgKGEsa3XeSzWtgbyoZVtoBdl3uv2f8rGi5qAlwZ9syO
Aold9zBMBgkqhkiG9w0BBwEwHQYJYIZIAWUDBAEqBBDGUmDGJfp2Iqn7bATf
r0H9gCBNamGg9iiM92wGcVSkNmGJtVk8yEe3EOVn/QNzQ6v0fw==]
"""
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
backup_file = yaml_file + ".bak"
with open(backup_file, 'w') as fhnd:
fhnd.write(content + "\nkey2: plain scalar string value")
result = script_runner.run([
self.command,
"--newprivatekey={}".format(new_eyaml_keys[0]),
"--newpublickey={}".format(new_eyaml_keys[1]),
"--oldprivatekey={}".format(old_eyaml_keys[0]),
"--oldpublickey={}".format(old_eyaml_keys[1]),
"--backup",
yaml_file
])
assert result.success, result.stderr
assert os.path.isfile(backup_file)
with open(backup_file, 'r') as fhnd:
filedat = fhnd.read()
assert filedat == content