-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdcsm.py
executable file
·251 lines (206 loc) · 8.14 KB
/
dcsm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import datetime
import os
import os.path
import string
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional
import re
import yaml
@dataclass
class FileInfo:
"""Info about a specific file we use here"""
path: Path
modified: Optional[datetime.datetime] = None
@classmethod
def from_env(cls, var: str) -> Optional['FileInfo']:
"""Loads file info from given enviroment variable"""
name = os.environ.get(var)
if not name:
return None
file = cls(path=Path(name).resolve())
if file.exists:
file.modified = datetime.datetime.fromtimestamp(
file.path.stat().st_mtime
)
return file
@property
def exists(self) -> bool:
"""Does this file exist?"""
return self.path.is_file()
def __str__(self) -> str:
return str(self.path)
@dataclass
class Files:
"""Info about all of our necessary files"""
keyfile: Optional[FileInfo] = field(
default_factory=lambda: FileInfo.from_env('DCSM_KEYFILE'))
secrets: Optional[FileInfo] = field(
default_factory=lambda: FileInfo.from_env('DCSM_SECRETS_FILE'))
source: Optional[FileInfo] = field(
default_factory=lambda: FileInfo.from_env('DCSM_SOURCE_FILE'))
def check_set(self, keyfile: bool = False, secrets: bool = False, source: bool = False) -> None:
"""Check that the given file is set"""
if keyfile and not self.keyfile:
raise ValueError("variable DCSM_KEYFILE is required")
if secrets and not self.secrets:
raise ValueError("variable DCSM_SECRETS_FILE is required")
if source and not self.source:
raise ValueError("variable DCSM_SOURCE_FILE is required")
def check_exists(self, keyfile: bool = False, secrets: bool = False, source: bool = False) -> None:
"""Check that the given file exists"""
self.check_set(keyfile, secrets, source)
assert self.keyfile and self.secrets and self.source
if keyfile and not self.keyfile.exists:
raise ValueError(f'DCSM_KEYFILE {self.keyfile} does not exist')
if secrets and not self.secrets.exists:
raise ValueError(f'DCSM_SECRETS_FILE {self.secrets} does not exist')
if source and not self.source.exists:
raise ValueError(f'DCSM_SOURCE_FILE {self.source} does not exist')
class DCSMTemplate(string.Template):
delimiter = '$DCSM'
flags = re.RegexFlag(value=0)
pattern = r"""
(?P<escaped>\$\$DCSM) |
(?:\$DCSM_(?P<named>[A-Z][A-Z0-9_]*)) |
(?:\$DCSM\{(?P<braced>[a-zA-Z][a-zA-Z0-9_]*)\}) |
(?P<invalid>$DCSM(?:\{\})?)
""" # type: ignore
def get_secrets(files: Files) -> Dict[str, Any]:
"""Return the secrets as a dictionary"""
files.check_exists(keyfile=True, secrets=True)
assert files.keyfile and files.secrets
process = subprocess.run(
['age', '--decrypt', '--identity', files.keyfile.path, files.secrets.path],
env={},
capture_output=True,
)
if process.returncode != 0:
raise ValueError(f'age decryption failed: {process.stderr.decode("utf-8")}')
output = process.stdout.decode('utf-8')
secrets: Dict[str, Any] = yaml.safe_load(output)
for key, value in secrets.items():
if not isinstance(key, str):
raise ValueError(f'secret key {key} is not a string')
return secrets
def template_file(source: Path, secrets: Dict[str, Any]) -> None:
"""Process a template file, inserting secrets"""
dest = source.with_suffix('')
with source.open() as template:
try:
result = DCSMTemplate(template.read()).substitute(secrets)
except Exception as e:
raise ValueError(f'error processing {source}: {e}')
with dest.open('w') as output:
output.write(result)
# copy ownership and permissions from source to dest
stat = source.stat()
os.chown(dest, stat.st_uid, stat.st_gid)
os.chmod(dest, stat.st_mode)
def template_dir(dirname: str, secrets: Dict[str, Any]) -> int:
"""Process all template files in the directory"""
processed = 0
for root, dirs, files in os.walk(dirname):
dir = Path(root).absolute()
for filename in files:
if filename.endswith('.template'):
template_file(dir.joinpath(filename), secrets)
processed += 1
return processed
def encrypt(files: Files) -> None:
"""Encrypt the source file into the secrets file"""
files.check_set(secrets=True)
files.check_exists(keyfile=True, source=True)
assert files.keyfile and files.secrets and files.source
source_is_newer = False
if not files.secrets.exists:
source_is_newer = True
elif files.source.modified and files.secrets.modified:
source_is_newer = files.source.modified > files.secrets.modified
if not source_is_newer:
raise ValueError('encrypted secrets are newer than secrets source; will not overwrite')
process = subprocess.run(
['age', '--encrypt', '--identity', files.keyfile.path, '--output', files.secrets.path, files.source.path],
env={},
capture_output=True,
)
if process.returncode != 0:
raise ValueError(f'age encryption failed: {process.stderr.decode("utf-8")}')
print(f"successfully encrypted source file {files.source.path} => {files.secrets.path}")
def decrypt(files: Files) -> None:
"""Decrypt the secrets file back out to the source file"""
files.check_set(source=True)
files.check_exists(keyfile=True, secrets=True)
assert files.keyfile and files.secrets and files.source
secrets_newer = False
if not files.source.exists:
secrets_newer = True
elif files.source.modified and files.secrets.modified:
secrets_newer = files.secrets.modified > files.source.modified
if not secrets_newer:
raise ValueError('secret source file is newer than encrypted secrets file; will not overwrite')
process = subprocess.run(
['age', '--decrypt', '--identity', files.keyfile.path, '--output', files.source.path, files.secrets.path],
env={},
capture_output=True,
)
if process.returncode != 0:
raise ValueError(f'age decryption failed: {process.stderr.decode("utf-8")}')
print(f"successfully decrypted secrets file {files.secrets.path} -> {files.source.path}")
print("don't forget to re-encrypt and remove the source file!")
def run(files: Files) -> None:
"""Process all template files"""
secrets = get_secrets(files)
processed = 0
for key, dirname in os.environ.items():
if not key.startswith('DCSM_TEMPLATE_'):
continue
if not os.path.exists(dirname):
raise ValueError(f'DCSM_TEMPLATE_{key} {dirname} does not exist')
processed += template_dir(dirname, secrets)
print(f"successfully processed {processed} template files")
def keygen(files: Files) -> None:
"""Generate a key file"""
files.check_set(keyfile=True)
assert files.keyfile
if files.keyfile.exists:
raise ValueError(f'key file {files.keyfile} already exists')
process = subprocess.run(
['age-keygen', '--output', files.keyfile.path],
env={},
capture_output=True,
)
if process.returncode != 0:
raise ValueError(f'age-keygen failed: {process.stderr.decode("utf-8")}')
print(f"successfully generated key file {files.keyfile.path}")
def main() -> None:
"""DCSM entry point"""
usage = "Usage: dcsm <run|encrypt|decrypt|keygen>"
try:
task = sys.argv[1]
except IndexError:
print(usage)
sys.exit(1)
# we always need the keyfile no matter what we're doing
files = Files()
if task == "run":
run(files)
elif task == "keygen":
keygen(files)
elif task == "encrypt":
encrypt(files)
elif task == "decrypt":
decrypt(files)
else:
print(usage)
sys.exit(1)
if __name__ == "__main__":
try:
main()
except Exception as e:
print("Unexpected error: {}".format(e))
sys.exit(1)
else:
sys.exit(0)