-
Notifications
You must be signed in to change notification settings - Fork 27
/
update-symlink-manifests
executable file
·444 lines (379 loc) · 18.5 KB
/
update-symlink-manifests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
#!/usr/bin/env python3
"""Update the cache file listing remote store symlinks.
This manifest is used by aliBuild and the publishers to speed up cloning
the remote store.
There are two sorts of manifests:
A. Package-specific manifests (created by build_package_manifest)
These aggregate symlinks for each package, e.g. all symlinks under
'TARS/<arch>/O2/'. This is used by aliBuild to speed up building its
list of in-use revision numbers.
Here, it is important that revision numbers are never re-used, so we
must include all symlinks in each package directory, even if their
target does not exist any more.
B. Dist symlink manifests (created by build_dist_manifest)
These manifests aggregate all symlinks under a 'dist' directory, e.g.
'TARS/<arch>/dist-direct/'. The publishers use these manifests to avoid
having to walk the entire dist tree.
This speed-up only works if publishers can rely on the fact that if the
manifest contains any symlinks in a directory, it will contain *all*
symlinks from that directory, so they do not have to list the directory
themselves. Of course, this does not apply recursively, so the existence
of 'TARS/arch/dist/package/package-version/tarball.tar.gz' in the store
only implies that the publisher can skip listing the
'TARS/arch/dist/package/package-version/' directory itself.
Dist manifests are sensitive to partial uploads, since symlinks must be
uploaded one-by-one to S3, and this script could see the partially-built
symlink directory while this is ongoing.
However, publishers must always have a full list of dependencies for the
packages they create, else they will fail to publish some required
packages. This script must not add partially-built symlink directories
to its dist manifests.
aliBuild solves the partial-upload problem by uploading the 'store'
tarball to 'TARS/<arch>/store/' as the last step. Once that tarball is
present, this signals to publishers that they can pick up the dist
directories related to that store tarball.
Relying on this and the fact that all 'dist/package/package-version/'
directories will contain a symlink to the tarball for
'package-version.arch.tar.gz' itself, two rules will ensure that we
never publish a partially-uploaded dist directory:
1. If any symlinks' targets don't exist, skip the entire directory.
2. If the directory doesn't contain the tarball pointing to the named
package, skip the directory as well.
This is implemented in the get_dist_symlinks_for_package function.
"""
import argparse
import fnmatch
import gzip
import logging
import math
import os
import os.path
import queue
import tempfile
import threading
import boto3
import botocore.exceptions
LOG = logging.getLogger(__name__)
LOG_TRACE = logging.DEBUG // 2
PACKAGE_MANIFEST_EXT = ".manifest"
DIST_MANIFEST_EXT = ".manifest.gz"
DIST_DIRS = "/dist/", "/dist-direct/", "/dist-runtime/",
def main(args):
"""Script entry point. Set up threading and queue jobs."""
threading.current_thread().name = "main"
setup_logging(args.verbose)
if args.read_only:
LOG.info("read-only mode; will not modify any objects on S3")
s3c = create_s3_client(args.s3_endpoint_url)
LOG.debug("listing architectures...")
all_archs = [os.path.basename(arch_dir.rstrip("/"))
for arch_dir in list_subdirs(s3c, args.s3_bucket,
args.store_prefix)]
LOG.debug("found architectures: %r", all_archs)
architectures = fnmatch.filter(all_archs, args.architectures)
LOG.debug("found %d architectures matching pattern; listing stores...",
len(architectures))
# Prevent caching dist symlinks for partially-published packages (i.e.
# where dist tree is not completely uploaded yet). If the "main" tarball in
# /store is present, that means the package is fully uploaded. Therefore,
# we need to know which tarballs are present, so fetch this list this once.
# This variable is shared across threads, so fill it before worker threads
# might access it.
store_tarballs = frozenset(
tarball
for arch in architectures
for tarball in list_files(s3c, args.s3_bucket,
args.store_prefix + arch + "/store/",
recursive=True)
)
LOG.debug("found %d store tarballs in total", len(store_tarballs))
# Set up download workers in separate threads, to speed up downloading the
# many small, individual symlink files.
req_queue = queue.Queue(maxsize=256)
fmt = "worker-%0{}d".format(math.ceil(math.log10(args.download_threads)))
workers = [
threading.Thread(target=manifest_dispatcher, name=fmt % i,
daemon=True, # kill when main thread exits
args=(args, req_queue, store_tarballs))
for i in range(args.download_threads)
]
for worker in workers:
worker.start()
# Loop through packages and their individual symlinks, and queue each
# symlink for download.
for arch in architectures:
arch_path = args.store_prefix + arch + "/"
for subdir in list_subdirs(s3c, args.s3_bucket, arch_path):
if subdir.endswith("/store/"):
LOG.debug("skipped store directory: %s", subdir)
elif any(map(subdir.endswith, DIST_DIRS)):
LOG.debug("queuing dist dir: %s", subdir)
req_queue.put(("dist", subdir))
else: # this must be a package
LOG.debug("queuing package: %s", subdir)
req_queue.put(("package", subdir))
# We're done filling the queue, so add "quit" sentinels and wait
# for all remaining items to be done. Each thread pops only one
# "quit" sentinel off the queue.
for _ in workers:
req_queue.put(("quit", None))
for worker in workers:
worker.join()
def manifest_dispatcher(args, req_queue, store_tarballs):
"""Handle manifest creation requests and dispatch to the right function."""
s3c = create_s3_client(args.s3_endpoint_url)
while True:
type_, subdir = req_queue.get()
if type_ == "package":
build_package_manifest(s3c, args.s3_bucket, subdir, args.read_only)
elif type_ == "dist":
build_dist_manifest(s3c, args.s3_bucket, store_tarballs, subdir,
args.read_only)
elif type_ == "quit":
req_queue.task_done()
break
else:
LOG.warning("unknown request type: %r; ignoring", type_)
req_queue.task_done()
def build_package_manifest(s3c, bucket, package, read_only):
"""Create a symlink manifest for a single package."""
symlinks = {}
manifest = package.rstrip("/") + PACKAGE_MANIFEST_EXT
# First, fetch the existing manifest (if any) for this package.
try:
lines = read_object(s3c, bucket, manifest).splitlines()
except botocore.exceptions.ClientError as err:
# Treat a missing manifest like an empty one; i.e., use only the
# individual symlinks.
LOG.info("error while fetching %s: %s; recreating from scratch",
manifest, err)
else:
LOG.info("found existing manifest %s", manifest)
for i, line in enumerate(lines):
link_key, sep, target = line.partition("\t")
if sep and link_key and target:
symlinks[link_key] = target.rstrip("\n")
else:
LOG.warning("%s:%d: ignored malformed line: %r",
manifest, i + 1, line)
LOG.debug("%s: found %d records", manifest, len(symlinks))
# Now go through the individual symlinks to fill out the new manifest.
have_changes = False
for linkpath in list_files(s3c, bucket, package):
if not os.path.basename(linkpath).startswith(
os.path.basename(package)):
LOG.warning("rejected symlink: not for package %s: %s",
os.path.basename(package), linkpath)
continue
if not linkpath.endswith(".tar.gz"):
LOG.warning("rejected symlink: not a tarball: %s", linkpath)
continue
linkname = os.path.basename(linkpath)
if linkname in symlinks:
LOG.debug("symlink already cached; not re-reading: %s", linkpath)
continue
target = read_object(s3c, bucket, linkpath).rstrip("\r\n")
LOG.log(LOG_TRACE, "read symlink: %s -> %s", link_key, target)
symlinks[linkname] = target
have_changes = True
if not have_changes:
LOG.debug("no new symlinks for %s; skipping upload", manifest)
return
# Now write out the new manifest.
# We must have a trailing newline at the end of the content, so that
# e.g. `curl | while read` won't ignore the last line.
content = "".join("%s\t%s\n" % (name, target)
for name, target in symlinks.items())
if read_only:
LOG.info("read-only mode; would've written %d records (%d bytes) to %s",
len(symlinks), len(content), manifest)
for i, line in enumerate(content.splitlines()):
LOG.log(LOG_TRACE, "%s:%d: %s", manifest, i + 1, line)
else:
LOG.info("writing %d records (%d bytes) to %s",
len(symlinks), len(content), manifest)
put_object(s3c, bucket, manifest, content.encode("utf-8"))
def get_dist_symlinks_for_package(s3c, bucket, package_path, store_tarballs):
"""Return symlinks and their targets in the given path.
PATH should be of the form TARS/ARCH/dist*/PACKAGE/PACKAGE-VERSION/.
"""
_, arch, *_ = package_path.split("/")
# Prune dist directories where we don't have a symlink to
# the package itself, which indicates that the set of symlinks
# is incomplete. aliBuild is probably still uploading it.
package_and_version = os.path.basename(package_path.rstrip("/"))
main_tar_name = f"{package_and_version}.{arch}.tar.gz"
if main_tar_name not in map(os.path.basename, store_tarballs):
LOG.info("rejected dist symlinks in %s due to incomplete upload: "
"main tarball not in store: %s", package_path, main_tar_name)
return {}
LOG.debug("directory %s not yet cached; listing...", package_path)
symlink_keys = frozenset(list_files(s3c, bucket, package_path))
if package_path + main_tar_name not in symlink_keys:
LOG.info("rejected dist symlinks in %s due to incomplete upload: main "
"tarball not found here: %s", package_path, main_tar_name)
return {}
# Fetch symlink targets.
symlink_targets = {}
for link_key in symlink_keys:
if not link_key.endswith(".tar.gz"):
LOG.warning("rejected symlink: not a tarball: %s", link_key)
continue
target = read_object(s3c, bucket, link_key).rstrip("\r\n")
LOG.log(LOG_TRACE, "read symlink: %s -> %s", link_key, target)
if target.lstrip("./") not in store_tarballs:
# If any symlink's target isn't present in /store, aliBuild is
# probably still uploading the relevant tarballs, and we can't
# rely on this directory being complete. Skip it for now.
LOG.info("rejected dist symlinks in %s due to incomplete upload: "
"target of %s -> %s not found in store",
package_path, link_key, target)
return {}
symlink_targets[link_key] = target
LOG.debug("found %d new dist symlinks for %s",
len(symlink_targets), package_path)
return symlink_targets
def build_dist_manifest(s3c, bucket, store_tarballs, dir_name, read_only):
"""Build a symlink manifest for the given dist subtree."""
manifest = dir_name.rstrip("/") + DIST_MANIFEST_EXT
_, arch, *_ = manifest.split("/")
# First, fetch the existing manifest (if any) for this package.
cached_symlinks = {}
try:
LOG.log(LOG_TRACE, "get_object(%r)", manifest)
manifest_object = s3c.get_object(Bucket=bucket, Key=manifest)["Body"]
except botocore.exceptions.ClientError as err:
# Treat a missing manifest like an empty one; i.e., use only the
# individual symlinks.
LOG.info("error while fetching %s: %s; recreating from scratch",
manifest, err)
else:
LOG.info("found existing manifest %s", manifest)
with gzip.open(manifest_object, "rb") as gzip_file:
for i, line in enumerate(gzip_file):
link_key, sep, target = line.decode("utf-8").partition("\t")
if sep and link_key and target:
cached_symlinks[link_key] = target.rstrip("\n")
else:
LOG.warning("%s:%d: ignored malformed line: %r",
manifest, i + 1, line)
LOG.debug("%s: found %d records", manifest, len(cached_symlinks))
cached_directories = frozenset(map(os.path.dirname, cached_symlinks))
LOG.debug("found %d already-cached directories in %s",
len(cached_directories), manifest)
# Look for directories of symlinks that we haven't seen before and
# add them to the cache.
new_symlinks = []
for package_path in list_subdirs(s3c, bucket, dir_name):
for package_version_path in list_subdirs(s3c, bucket, package_path):
# If we've cached this subdirectory before, skip it.
if package_version_path.rstrip("/") in cached_directories:
LOG.debug("directory %s already cached; skipping",
package_version_path)
continue
symlinks = get_dist_symlinks_for_package(
s3c, bucket, package_version_path, store_tarballs,
)
if symlinks:
LOG.debug("adding %d safe new dist symlinks from %s",
len(symlinks), package_version_path)
cached_symlinks.update(symlinks)
del cached_directories
if not new_symlinks:
LOG.debug("no new symlinks for %s; skipping upload", manifest)
return
with tempfile.TemporaryFile("w+b") as buffer:
with gzip.open(buffer, "wt") as gzip_file:
for name, target in sorted(cached_symlinks.items()):
print(name, target, sep="\t", file=gzip_file)
if read_only:
LOG.info("read-only mode; would've written %d records (%d bytes "
"compressed) to %s",
len(cached_symlinks), buffer.tell(), manifest)
buffer.seek(0) # let gzip.open read the whole thing
with gzip.open(buffer, "rt") as gzip_file:
for i, line in enumerate(gzip_file):
LOG.log(LOG_TRACE, "%s:%d: %s", manifest, i + 1, line)
else:
LOG.info("writing %d records (%d bytes compressed) to %s",
len(cached_symlinks), buffer.tell(), manifest)
buffer.seek(0) # let put_object read the whole thing
put_object(s3c, bucket, manifest, buffer)
def read_object(s3c, bucket, key):
"""Return the full contents of the specified object as a str."""
LOG.log(LOG_TRACE, "read_object(%r)", key)
return s3c.get_object(Bucket=bucket, Key=key)["Body"] \
.read().decode("utf-8")
def list_subdirs(s3c, bucket, prefix):
"""Generate subdirectory names under prefix."""
LOG.log(LOG_TRACE, "list_subdirs(%r)", prefix)
for page in s3c.get_paginator("list_objects_v2").paginate(
Bucket=bucket, Delimiter="/", Prefix=prefix):
for item in page.get("CommonPrefixes", ()):
yield item["Prefix"]
def list_files(s3c, bucket, prefix, *, recursive=False):
"""Find and generate file names under prefix.
With recursive=False (the default), only return file names directly
under prefix. Otherwise, also search subdirectories of prefix for
files and return them.
"""
LOG.log(LOG_TRACE, "list_files(%r, recursive=%r)", prefix, recursive)
args = {} if recursive else {"Delimiter": "/"}
for page in s3c.get_paginator("list_objects_v2").paginate(
Bucket=bucket, Prefix=prefix, **args):
for item in page.get("Contents", ()):
yield item["Key"]
def put_object(s3c, bucket, key, contents):
"""Write an object to S3 at the given key."""
LOG.log(LOG_TRACE, "put_object(%r)", key)
s3c.put_object(Bucket=bucket, Key=key, Body=contents)
def create_s3_client(endpoint_url):
"""Create a boto3 client for S3."""
return boto3.client(
"s3", endpoint_url=endpoint_url,
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"])
def setup_logging(verbose):
"""Set up the global logger for this script."""
logging.addLevelName(LOG_TRACE, "TRACE")
logger_handler = logging.StreamHandler()
logger_handler.setFormatter(logging.Formatter(
"%(filename)s:%(threadName)s:%(levelname)s: %(message)s"))
LOG.addHandler(logger_handler)
if verbose > 1: # -v -v
LOG.setLevel(LOG_TRACE)
elif verbose: # -v
LOG.setLevel(logging.DEBUG)
else: # default
LOG.setLevel(logging.INFO)
def parse_args():
"""Parse and return command-line arguments."""
parser = argparse.ArgumentParser(description=__doc__, epilog="""\
S3 credentials are read from the AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY environment variables. These are required.
""")
parser.add_argument(
"-v", "--verbose", action="count", default=0,
help="show debug logging output; show tracing output when given twice")
parser.add_argument(
"-r", "--read-only", "-n", "--dry-run", action="store_true",
help="don't write new manifests to S3")
parser.add_argument(
"-j", "--download-threads", default=4, type=int, metavar="N",
help="fetch symlinks using %(metavar)s threads (default %(default)r)")
parser.add_argument(
"-p", "--store-prefix", default="TARS/", metavar="PREFIX/",
help="path prefix on S3 with trailing '/' (default %(default)r)")
parser.add_argument(
"-a", "--architectures", default="*", metavar="GLOB",
help="only process architectures matching this fnmatch/glob pattern "
"(default %(default)r to match all architectures present on S3)")
parser.add_argument(
"--s3-bucket", default="alibuild-repo", metavar="BUCKET",
help="S3 bucket to read (default %(default)r)")
parser.add_argument(
"--s3-endpoint-url", default="https://s3.cern.ch", metavar="ENDPOINT",
help="base URL of the S3 API (default %(default)r)")
return parser.parse_args()
if __name__ == "__main__":
main(parse_args())