Analysis of chunk size, compressor and filters on 1000 genomes data #74
Replies: 10 comments 16 replies
-
I can take a look at this and run the experiments. |
Beta Was this translation helpful? Give feedback.
-
Not above that the |
Beta Was this translation helpful? Give feedback.
-
I've added the generalised
So, it seems like we're doing a pretty good job here on everything except the PL fields (which is discussed in #53). In particular, we're getting a compression ratio of 110 on @shz9, is this something you'd like to look into? |
Beta Was this translation helpful? Give feedback.
-
Update, here's the same for chr2:
Overall pattern is basically the same |
Beta Was this translation helpful? Give feedback.
-
Would an approach like the following work for testing this systematically without generating schemas? import itertools
import zarr
import numcodecs
import pandas as pd
import numpy as np
from tqdm import tqdm
def generate_chunksize_variations(shape, min_chunksize=1000):
"""
Generate variations for the chunk sizes
"""
dim_chunksizes = []
for sdim in shape:
if sdim <= min_chunksize:
dim_chunksizes.append([sdim])
else:
dim_chunksizes.append([min_chunksize])
while True:
if dim_chunksizes[-1][-1]*10 < sdim:
dim_chunksizes[-1].append(dim_chunksizes[-1][-1]*10)
else:
break
dim_chunksizes[-1].append(sdim)
return list(itertools.product(*dim_chunksizes))
def generate_blosc_variations():
"""
Generate variations for the Blosc compressor
"""
from numcodecs.blosc import list_compressors
compressors = list_compressors()
bit_shuffle = list(range(-1, 3))
c_levels = [5, 7, 9]
return [
{'cname': cn, 'shuffle': bits, 'clevel': cl}
for cn, bits, cl in itertools.product(compressors, bit_shuffle, c_levels)
]
def test_compression_variations(z_arr):
"""
Test the impact of compressor options + chunksizes on compression ratios
of Zarr arrays.
"""
chunksize_var = generate_chunksize_variations(z_arr.shape)
blosc_var = generate_blosc_variations()
comp_results_table = []
for cs_var, bcomp_var in tqdm(itertools.product(chunksize_var, blosc_var),
total=len(chunksize_var)*len(blosc_var),
desc='Compression variations'):
new_compressor = numcodecs.Blosc(**bcomp_var)
z2 = zarr.empty_like(z_arr, chunks=cs_var, compressor=new_compressor)
z2[:] = z_arr[:]
comp_results_table.append({**bcomp_var,
**{'chunksize': cs_var,
'CompressionRatio': float(dict(z2.info_items())['Storage ratio'])}})
# If the array is of type bool, try the PackBits codec:
if z_arr.dtype == bool:
for cs_var in chunksize_var:
z2 = zarr.empty_like(z_arr, chunks=cs_var, compressor=numcodecs.PackBits())
z2[:] = z_arr[:]
comp_results_table.append({
'cname': 'PackBits', 'shuffle': None, 'clevel': None,
'chunksize': cs_var,
'CompressionRatio': float(dict(z2.info_items())['Storage ratio'])
})
return pd.DataFrame(comp_results_table).sort_values('CompressionRatio', ascending=False)
def test_vcf2zarr_compression_variations(z_group, keys=None):
"""
Test the impact of compressor options + chunksizes on compression ratios
of vcf2zarr zarr arrays.
"""
keys = keys or list(z_group.keys())
tabs = []
for k in keys:
tabs.append(test_compression_variations(z_group[k]))
tabs[-1]['ArrayName'] = k
return pd.concat(tabs)
I tested this on my end with the tiny VCF that I've been working with and it seems to work. I don't have a lot of samples/variants in this file, so couldn't test a lot of variations of the chunksizes. I'll see if I can test it on larger files over the weekend. This is how I invoke it on already converted VCF files (if you want to test on your converted VCF files): import zarr
z = zarr.open("tmp/sample.zarr/")
res = test_vcf2zarr_compression_variations(z, ['call_PL', 'call_genotype', 'call_genotype_mask'] I get something like this: cname shuffle clevel chunksize CompressionRatio ArrayName
52 zstd 0 7 (1000, 91, 28) 24.0 call_PL
112 zstd 0 7 (6515, 91, 28) 24.0 call_PL
115 zstd 1 7 (6515, 91, 28) 22.9 call_PL
109 zstd -1 7 (6515, 91, 28) 22.9 call_PL
55 zstd 1 7 (1000, 91, 28) 22.8 call_PL
49 zstd -1 7 (1000, 91, 28) 22.8 call_PL
...
cname shuffle clevel chunksize CompressionRatio ArrayName
112 zstd 0 7 (6515, 91, 2) 32.5 call_genotype
115 zstd 1 7 (6515, 91, 2) 32.5 call_genotype
49 zstd -1 7 (1000, 91, 2) 31.2 call_genotype
58 zstd 2 7 (1000, 91, 2) 31.2 call_genotype
109 zstd -1 7 (6515, 91, 2) 30.4 call_genotype
118 zstd 2 7 (6515, 91, 2) 30.4 call_genotype
52 zstd 0 7 (1000, 91, 2) 29.9 call_genotype
55 zstd 1 7 (1000, 91, 2) 29.9 call_genotype No huge variations of compression ratios by varying the |
Beta Was this translation helpful? Give feedback.
-
It turns out Alistair did a really nice analysis on this in 2016 for genotypes. His conclusion is that zstd is great, and the bit shuffle filter adds quite a bit of extra compression. We should probably adopt this as our default for genotypes. Can you try bit shuffle @shz9? I think the open question here is how we can do better with things like GQ and DP which aren't compressing very well currently (again, ignoring PL as it requires special treatment) |
Beta Was this translation helpful? Give feedback.
-
Here are some preliminary results on this from my analysis of
Here's the latest version script I'm using if anyone else would like to experiment: import itertools
import zarr
import numcodecs
import pandas as pd
import numpy as np
from tqdm import tqdm
import argparse
def generate_shape_permutations(shape):
return list(itertools.permutations(range(len(shape))))
def generate_chunksize_variations(shape,
min_chunksize=(1000, 100),
max_chunksize=(10_000, None)):
"""
Generate variations for the chunk sizes
"""
if isinstance(min_chunksize, int):
min_chunksize = list(np.repeat(min_chunksize, len(shape)))
if isinstance(max_chunksize, int):
max_chunksize = list(np.repeat(max_chunksize, len(shape)))
dim_chunksizes = []
for sdim, min_cs, max_cs in itertools.zip_longest(shape, min_chunksize, max_chunksize):
if min_cs is None:
min_cs = sdim
if max_cs is None or max_cs > sdim:
max_cs = sdim
if sdim <= min_cs:
dim_chunksizes.append([sdim])
else:
dim_chunksizes.append([min_cs])
while True:
if dim_chunksizes[-1][-1] * 10 <= max_cs:
dim_chunksizes[-1].append(dim_chunksizes[-1][-1] * 10)
else:
break
return list(itertools.product(*dim_chunksizes))
def generate_blosc_variations():
"""
Generate variations for the Blosc compressor
"""
from numcodecs.blosc import list_compressors
compressors = ['zstd'] # list_compressors()
bit_shuffle = list(range(-1, 3))
c_levels = [7] #[5, 7, 9]
return [
{'cname': cn, 'shuffle': bits, 'clevel': cl}
for cn, bits, cl in itertools.product(compressors, bit_shuffle, c_levels)
]
def test_compression_variations(z_arr,
max_chunks=10,
min_chunksize=(1000, 100),
max_chunksize=(10_000, None),
dry_run=False):
"""
Test the impact of compressor options + chunksizes on compression ratios
of Zarr arrays.
:param z_arr: A Zarr array.
:param max_chunks: Only extract up to `max_chunks` from the original array for processing
(to save memory)
:param max_chunksize: The maximum chunksize (along any dimension) to test.
:param dry_run: If True, generate the variations table without copying any data.
"""
shape_perm = generate_shape_permutations(z_arr.shape)
chunksize_var = generate_chunksize_variations(z_arr.shape,
min_chunksize=min_chunksize,
max_chunksize=max_chunksize)
blosc_var = generate_blosc_variations()
comp_results_table = []
# Determine the shape to extract based on the chunksize variations and
# the max_chunks parameter:
max_idx = np.minimum(np.array(chunksize_var).max(axis=0) * max_chunks,
np.array(z_arr.shape))
print(max_idx)
slices = slices = tuple(slice(0, n) for n in max_idx)
if not dry_run:
# Extract the data
arr = z_arr[slices]
for cs_var, bcomp_var, dim_order in tqdm(itertools.product(chunksize_var, blosc_var, shape_perm),
total=len(chunksize_var) * len(blosc_var) * len(shape_perm),
desc='Compression variations'):
reshaped_chunks = tuple(np.array(cs_var)[np.array(dim_order)])
if not dry_run:
arr_reshaped = arr.transpose(dim_order)
new_compressor = numcodecs.Blosc(**bcomp_var)
z2 = zarr.empty_like(arr_reshaped,
chunks=reshaped_chunks,
compressor=new_compressor,
dtype=z_arr.dtype)
z2[:] = arr_reshaped
n_chunks = z2.nchunks
compress_ratio = float(dict(z2.info_items())['Storage ratio'])
else:
n_chunks = None
compress_ratio = None
comp_results_table.append({**bcomp_var,
**{'chunksize': reshaped_chunks,
'nchunks': n_chunks,
'dim_order': dim_order,
'CompressionRatio': compress_ratio}})
return pd.DataFrame(comp_results_table).sort_values('CompressionRatio', ascending=False)
def test_vcf2zarr_compression_variations(z_group,
keys=None,
max_chunks=10,
min_chunksize=(1000, 100),
max_chunksize=(10_000, None),
dry_run=False):
"""
Test the impact of compressor options + chunksizes on compression ratios
of vcf2zarr zarr arrays.
:param z_group: A Zarr group
:param keys: A list of keys to test the compression variations on. If None,
it tests on all the Zarr array in the hierarchy.
:param max_chunks: Only extract up to `max_chunks` from the original array for processing
(to save memory)
:param max_chunksize: The maximum chunksize (along any dimension) to test.
:param dry_run: If True, generate the variations table without copying any data.
"""
keys = keys or list(z_group.keys())
tabs = []
for k in keys:
print("Testing:", k)
tabs.append(test_compression_variations(z_group[k],
max_chunks=max_chunks,
min_chunksize=min_chunksize,
max_chunksize=max_chunksize,
dry_run=dry_run))
tabs[-1]['ArrayName'] = k
return pd.concat(tabs)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='vcf2zarr compression benchmarks')
parser.add_argument('-i', '--input', dest='input_zarr_group', type=str, required=True,
help='The path to the input Zarr group.')
parser.add_argument('-o', '--output', dest='output_file', type=str, required=True,
help='The path to the output CSV file.')
parser.add_argument('--keys', dest='keys', type=str, nargs='+', default=None,
help='The keys to test the compression variations on. If not provided, '
'it tests on all the Zarr arrays in the hierarchy.')
parser.add_argument('--max-chunks', dest='max_chunks', type=int, default=10,
help='Only extract up to `max_chunks` from the original array for processing.')
parser.add_argument('--dry-run', dest='dry_run', action='store_true', default=False,
help='If True, generate the variations table without copying any data.')
args = parser.parse_args()
results_df = test_vcf2zarr_compression_variations(zarr.open(args.input_zarr_group),
keys=args.keys,
max_chunks=args.max_chunks,
dry_run=args.dry_run)
results_df.to_csv(args.output_file, index=False) |
Beta Was this translation helpful? Give feedback.
-
Here are the compression results for all the I'm also including a version of the plot that only has the dimensions in the default order From these results, here are my recommendations for the compressors/chunk sizes for these fields:
I think what's remaining to explore here is the idea of applying filters before compression, such as |
Beta Was this translation helpful? Give feedback.
-
Here are the results for the It seems to help quite substantially, especially when For This quantization may be too aggressive, but it doubles/triples the compression ratio, especially when coupled with Finally, here's the final version of the benchmarking script: compression_benchmarks.pyimport itertools
import zarr
import numcodecs
import pandas as pd
import numpy as np
from tqdm import tqdm
import argparse
def generate_shape_permutations(shape):
return list(itertools.permutations(range(len(shape))))
def generate_chunksize_variations(shape,
min_chunksize=(1000, 100),
max_chunksize=(10_000, None)):
"""
Generate variations for the chunk sizes
"""
if isinstance(min_chunksize, int):
min_chunksize = list(np.repeat(min_chunksize, len(shape)))
if isinstance(max_chunksize, int):
max_chunksize = list(np.repeat(max_chunksize, len(shape)))
dim_chunksizes = []
for sdim, min_cs, max_cs in itertools.zip_longest(shape, min_chunksize, max_chunksize):
if min_cs is None:
min_cs = sdim
if max_cs is None or max_cs > sdim:
max_cs = sdim
if sdim <= min_cs:
dim_chunksizes.append([sdim])
else:
dim_chunksizes.append([min_cs])
while True:
if dim_chunksizes[-1][-1] * 10 <= max_cs:
dim_chunksizes[-1].append(dim_chunksizes[-1][-1] * 10)
else:
break
return list(itertools.product(*dim_chunksizes))
def generate_blosc_variations():
"""
Generate variations for the Blosc compressor
"""
from numcodecs.blosc import list_compressors
compressors = ['zstd'] # list_compressors()
bit_shuffle = list(range(3))
c_levels = [7] #[5, 7, 9]
return [
{'cname': cn, 'shuffle': bits, 'clevel': cl}
for cn, bits, cl in itertools.product(compressors, bit_shuffle, c_levels)
]
def test_compression_variations(z_arr,
max_chunks=10,
min_chunksize=(1000, 100),
max_chunksize=(10_000, None),
dry_run=False):
"""
Test the impact of compressor options + chunksizes on compression ratios
of Zarr arrays.
:param z_arr: A Zarr array.
:param max_chunks: Only extract up to `max_chunks` from the original array for processing
(to save memory)
:param max_chunksize: The maximum chunksize (along any dimension) to test.
:param dry_run: If True, generate the variations table without copying any data.
"""
shape_perm = generate_shape_permutations(z_arr.shape)
chunksize_var = generate_chunksize_variations(z_arr.shape,
min_chunksize=min_chunksize,
max_chunksize=max_chunksize)
blosc_var = generate_blosc_variations()
comp_results_table = []
# Determine the shape to extract based on the chunksize variations and
# the max_chunks parameter:
max_idx = np.minimum(np.array(chunksize_var).max(axis=0) * max_chunks,
np.array(z_arr.shape))
print(max_idx)
slices = slices = tuple(slice(0, n) for n in max_idx)
if not dry_run:
# Extract the data
arr = z_arr[slices]
for cs_var, bcomp_var, dim_order in tqdm(itertools.product(chunksize_var, blosc_var, shape_perm),
total=len(chunksize_var) * len(blosc_var) * len(shape_perm),
desc='Compression variations'):
reshaped_chunks = tuple(np.array(cs_var)[np.array(dim_order)])
if not dry_run:
arr_reshaped = arr.transpose(dim_order)
new_compressor = numcodecs.Blosc(**bcomp_var)
if z_arr.filters is not None:
object_codec = z_arr.filters[0]
else:
object_codec = None
z2 = zarr.empty_like(arr_reshaped,
chunks=reshaped_chunks,
compressor=new_compressor,
dtype=z_arr.dtype,
object_codec=object_codec)
z2[:] = arr_reshaped
n_chunks = z2.nchunks
compress_ratio = float(dict(z2.info_items())['Storage ratio'])
else:
n_chunks = None
compress_ratio = None
comp_results_table.append({**bcomp_var,
**{'chunksize': reshaped_chunks,
'nchunks': n_chunks,
'dim_order': dim_order,
'CompressionRatio': compress_ratio}})
# Test the combination of filters + compressor:
if arr.dtype == bool:
if not dry_run:
z2 = zarr.empty_like(arr_reshaped,
chunks=reshaped_chunks,
compressor=new_compressor,
filters=[numcodecs.PackBits()],
dtype=z_arr.dtype,
object_codec=object_codec)
z2[:] = arr_reshaped
n_chunks = z2.nchunks
compress_ratio = float(dict(z2.info_items())['Storage ratio'])
else:
n_chunks = None
compress_ratio = None
comp_results_table.append({**bcomp_var,
**{'chunksize': reshaped_chunks,
'nchunks': n_chunks,
'dim_order': dim_order,
'CompressionRatio': compress_ratio}})
comp_results_table[-1]['cname'] += '+PackBits'
elif np.issubdtype(arr.dtype, np.floating):
if not dry_run:
z2 = zarr.empty_like(arr_reshaped,
chunks=reshaped_chunks,
compressor=new_compressor,
filters=[numcodecs.Quantize(1, arr.dtype)],
dtype=z_arr.dtype,
object_codec=object_codec)
z2[:] = arr_reshaped
n_chunks = z2.nchunks
compress_ratio = float(dict(z2.info_items())['Storage ratio'])
else:
n_chunks = None
compress_ratio = None
comp_results_table.append({**bcomp_var,
**{'chunksize': reshaped_chunks,
'nchunks': n_chunks,
'dim_order': dim_order,
'CompressionRatio': compress_ratio}})
comp_results_table[-1]['cname'] += '+Quantize'
return pd.DataFrame(comp_results_table).sort_values('CompressionRatio', ascending=False)
def test_vcf2zarr_compression_variations(z_group,
keys=None,
max_chunks=10,
min_chunksize=(1000, 100),
max_chunksize=(10_000, None),
dry_run=False):
"""
Test the impact of compressor options + chunksizes on compression ratios
of vcf2zarr zarr arrays.
:param z_group: A Zarr group
:param keys: A list of keys to test the compression variations on. If None,
it tests on all the Zarr array in the hierarchy.
:param max_chunks: Only extract up to `max_chunks` from the original array for processing
(to save memory)
:param max_chunksize: The maximum chunksize (along any dimension) to test.
:param dry_run: If True, generate the variations table without copying any data.
"""
keys = keys or list(z_group.keys())
tabs = []
for k in keys:
print("Testing:", k)
tabs.append(test_compression_variations(z_group[k],
max_chunks=max_chunks,
min_chunksize=min_chunksize,
max_chunksize=max_chunksize,
dry_run=dry_run))
tabs[-1]['ArrayName'] = k
return pd.concat(tabs)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='vcf2zarr compression benchmarks')
parser.add_argument('-i', '--input', dest='input_zarr_group', type=str, required=True,
help='The path to the input Zarr group.')
parser.add_argument('-o', '--output', dest='output_file', type=str, required=True,
help='The path to the output CSV file.')
parser.add_argument('--keys', dest='keys', type=str, nargs='+', default=None,
help='The keys to test the compression variations on. If not provided, '
'it tests on all the Zarr arrays in the hierarchy.')
parser.add_argument('--max-chunks', dest='max_chunks', type=int, default=10,
help='Only extract up to `max_chunks` from the original array for processing.')
parser.add_argument('--dry-run', dest='dry_run', action='store_true', default=False,
help='If True, generate the variations table without copying any data.')
args = parser.parse_args()
results_df = test_vcf2zarr_compression_variations(zarr.open(args.input_zarr_group),
keys=args.keys,
max_chunks=args.max_chunks,
dry_run=args.dry_run)
results_df.to_csv(args.output_file, index=False) And the plotting functions/utils: plotting_functions.pyimport seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import ast
arr = np.array(['v', 's', 'p'], dtype=str)
def map_chunksize(row):
arr_r = arr[np.array(ast.literal_eval(row.dim_order))]
arr_c = np.array(ast.literal_eval(row.chunksize)).astype(str)
return ",".join([r + c for r, c in zip(arr_r, arr_c)])
def extract_chunksize_per_dim(row):
arr_r = list(ast.literal_eval(row.dim_order))
arr_c = list(ast.literal_eval(row.chunksize))
return pd.Series([arr_c[arr_r.index(0)], arr_c[arr_r.index(1)]])
df = pd.read_csv("PATH_TO_OUTPUT_CSV_FILE")
df['named_chunksize'] = df.apply(map_chunksize, axis=1)
df[['variant_cs', 'sample_cs']] = df.apply(extract_chunksize_per_dim, axis=1)
df = df.loc[df.shuffle != -1]
df_plot = df.loc[df.dim_order.isin(['(0, 1)', '(0, 1, 2)'])]
g = sns.catplot(df_plot, kind='bar', row='cname', col='shuffle',
x='named_chunksize', y='CompressionRatio', sharex=False, sharey='row')
for i, ax in enumerate(g.fig.axes): ## getting all axes of the fig object
ax.set_xticklabels(ax.get_xticklabels(), rotation = 90)
plt.subplots_adjust(hspace = 1.)
plt.show() |
Beta Was this translation helpful? Give feedback.
-
One thing I wondered about slightly idly @shz9, is whether having power-of-two sized chunks would make any difference? In principle, by aligning with OS page boundaries (usually 4K) there may be an overall performance improvement. In practise, it probably doesn't make much difference. If it's not too much hassle, I wonder if we could try something close to 10_000 x 1000 which 4KiB aligned? |
Beta Was this translation helpful? Give feedback.
-
It would be very useful to have a high-level overview of the effect of chunk size, compressor and filters on some recent VCF data. The 1000 genomes NYGC resequencing data is as good a choice as any.
Let's just look at the FORMAT fields, as the 1D stuff doesn't really contribute much and makes no real difference. For chr20 I have :
As discussed in #53, the PL values are a major outlier here. Let's exclude those too.
So, for the remaining fields, what I'd like to do is systematically try out combinations of
A good way to proceed here would be to
vcf2zarr encode -s [your schema] --max-variant-chunks=10
to encode the first 10 variant chunksWe should get enough information from the first 10 variant chunks, but we can make that bigger if it seems necessary.
(Note the generalised
inspect
command for Zarr datasets discussed in #39 would be useful for step 5 here. Basically it would just use the zarr info method, and tabulate the output)Beta Was this translation helpful? Give feedback.
All reactions