Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[builder] normalized layer improvements #884

Merged
merged 20 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/cellxgene_census_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,9 @@ Per the CELLxGENE dataset schema, [all RNA assays MUST include UMI or read count
This is an experimental data artifact - it may be removed at any time.

A library-sized normalized layer, containing a normalized variant of the count (raw) matrix.
For a value `X[i,j]` in the counts (raw) matrix, library-size normalized values are defined
For Smart-Seq assays, given a value `X[i,j]` in the counts (raw) matrix, library-size normalized values are defined
as `normalized[i,j] = (X[i,j] / var[j].feature_length) / sum(X[i, ] / var.feature_length[j])`.
For all other assays, for a value `X[i,j]` in the counts (raw) matrix, library-size normalized values are defined
as `normalized[i,j] = X[i,j] / sum(X[i, ])`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the schema also mention the precision reduction being performed? Or are we satisfied that it's near equivalence justifies not having to mention it? (I did see Pablo's verification)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR does not introduce precision reduction, just improves it. I leave this up to @pablo-gar to resolve at a later date, at his discretion.


#### Feature metadata – `census_obj["census_data"][organism].ms["RNA"].var` – `SOMADataFrame`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import anndata
import attrs
import numba
import numpy as np
import numpy.typing as npt
import pandas as pd
Expand All @@ -35,7 +36,6 @@
CENSUS_OBS_TERM_COLUMNS,
CENSUS_VAR_PLATFORM_CONFIG,
CENSUS_VAR_TERM_COLUMNS,
CENSUS_X_LAYER_NORMALIZED_FLOAT_SCALE_FACTOR,
CENSUS_X_LAYERS,
CENSUS_X_LAYERS_PLATFORM_CONFIG,
CXG_OBS_TERM_COLUMNS,
Expand Down Expand Up @@ -361,8 +361,11 @@ def write_X_normalized(self, args: CensusBuildArgs) -> None:
return

logging.info(f"Write X normalized: {self.name} - starting")
# grab the previously calculated sum of the X['raw'] layer
raw_sum = self.obs_df.raw_sum.to_numpy()
is_smart_seq = np.isin(
self.obs_df.assay_ontology_term_id.to_numpy(), ["EFO:0008930", "EFO:0008931", "EFO:0700016"]
bkmartinjr marked this conversation as resolved.
Show resolved Hide resolved
)
assert self.var_df is not None
feature_length = self.var_df.feature_length.to_numpy()

if args.config.multi_process:
STRIDE = 1_000_000 # controls TileDB fragment size, which impacts consolidation time
Expand All @@ -371,7 +374,9 @@ def write_X_normalized(self, args: CensusBuildArgs) -> None:
n_workers = n_workers_from_memory_budget(args, mem_budget)
with create_process_pool_executor(args, max_workers=n_workers) as pe:
futures = [
pe.submit(_write_X_normalized, (self.experiment.uri, start_id, STRIDE, raw_sum))
pe.submit(
_write_X_normalized, (self.experiment.uri, start_id, STRIDE, feature_length, is_smart_seq)
)
for start_id in range(0, self.n_obs, STRIDE)
]
for n, f in enumerate(concurrent.futures.as_completed(futures), start=1):
Expand All @@ -382,7 +387,7 @@ def write_X_normalized(self, args: CensusBuildArgs) -> None:
log_process_resource_status()

else:
_write_X_normalized((self.experiment.uri, 0, self.n_obs, raw_sum))
_write_X_normalized((self.experiment.uri, 0, self.n_obs, feature_length, is_smart_seq))

logging.info(f"Write X normalized: {self.name} - finished")
log_process_resource_status()
Expand Down Expand Up @@ -731,30 +736,16 @@ def reopen_experiment_builders(
yield eb


def _write_X_normalized(args: Tuple[str, int, int, npt.NDArray[np.float32]]) -> None:
def _write_X_normalized(args: Tuple[str, int, int, npt.NDArray[np.int64], npt.NDArray[np.bool_]]) -> None:
"""
Helper for ExperimentBuilder.write_X_normalized.

Read indicated rows from X['raw'], write to X['normalized']
"""
experiment_uri, obs_joinid_start, n, raw_sum = args
experiment_uri, obs_joinid_start, n, feature_length, is_smart_seq = args
logging.info(f"Write X normalized - starting block {obs_joinid_start}")

"""
Adjust normlized layer to never encode zero-valued cells where the raw count
value is greater than zero. In our current schema configuration, FloatScaleFilter
reduces the precision of each value, storing ``round((raw_float - offset) / factor)``
as a four byte int.

To ensure non-zero raw values, which would _normally_ scale to zero under
these conditions, we add the smallest possible sigma to each value (note that
zero valued coordinates are not stored, as this is a sparse array).

Reducing the above transformation, and assuming float32 values, the smallest sigma is
1/2 of the scale factor (bits of precision). Accounting for IEEE float precision,
this reduces to:
"""
sigma = 0.5 * (CENSUS_X_LAYER_NORMALIZED_FLOAT_SCALE_FACTOR + np.finfo(np.float32).epsneg)
sigma = np.finfo(np.float32).smallest_subnormal

with soma.open(
urlcat(experiment_uri, "ms", MEASUREMENT_RNA_NAME, "X", "raw"), mode="r", context=SOMA_TileDB_Context()
Expand All @@ -766,27 +757,88 @@ def _write_X_normalized(args: Tuple[str, int, int, npt.NDArray[np.float32]]) ->
) as X_normalized:
with create_thread_pool_executor(max_workers=8) as pool:
lazy_reader = EagerIterator(
X_raw.read(coords=(slice(obs_joinid_start, obs_joinid_start + n - 1),)).tables(),
pool=pool,
)
lazy_divider = EagerIterator(
(
(
X_tbl["soma_dim_0"],
X_tbl["soma_dim_1"],
X_tbl["soma_data"].to_numpy() / raw_sum[X_tbl["soma_dim_0"]] + sigma,
(tbl, obs_indices.to_numpy())
for tbl, (obs_indices, _) in X_raw.read(
coords=(slice(obs_joinid_start, obs_joinid_start + n - 1),)
)
for X_tbl in lazy_reader
.blockwise(axis=0, reindex_disable_on_axis=[1])
.tables()
),
pool=pool,
)
for soma_dim_0, soma_dim_1, soma_data in lazy_divider:
assert np.all(soma_data > 0.0), "Found unexpected zero value in raw layer data"

def _norm_it(
tbl: pa.Table, obs_indices: npt.NDArray[np.int64]
) -> Tuple[
npt.NDArray[np.int64], npt.NDArray[np.int64], npt.NDArray[np.float32], npt.NDArray[np.int64]
]:
d0: npt.NDArray[np.int64] = tbl["soma_dim_0"].to_numpy()
d1: npt.NDArray[np.int64] = tbl["soma_dim_1"].to_numpy()
data: npt.NDArray[np.float32] = tbl["soma_data"].to_numpy()
data = _normalize(d0, d1, data, is_smart_seq[obs_indices], feature_length)
data = _roundHalfToEven(data, keepbits=15)
data[data == 0] = sigma
return d0, d1, data, obs_indices

lazy_norm_reader = EagerIterator(
(_norm_it(tbl, obs_indices) for tbl, obs_indices in lazy_reader), pool=pool
)

for d0, d1, data, obs_indices in lazy_norm_reader:
X_normalized.write(
pa.Table.from_arrays(
[soma_dim_0, soma_dim_1, soma_data],
names=["soma_dim_0", "soma_dim_1", "soma_data"],
pa.Table.from_pydict(
{
"soma_dim_0": obs_indices[d0],
"soma_dim_1": d1,
"soma_data": data,
}
)
)

logging.info(f"Write X normalized - finished block {obs_joinid_start}")


@numba.jit(nopython=True, nogil=True) # type: ignore[misc] # See https://github.com/numba/numba/issues/7424
def _normalize(
d0: npt.NDArray[np.int64],
d1: npt.NDArray[np.int64],
data: npt.NDArray[np.float32],
is_smart_seq: npt.NDArray[np.bool_],
feature_length: npt.NDArray[np.float32],
) -> npt.NDArray[np.float32]:
# normalize and sum COO data along rows (assertion: will have full rows (along axis 0))
norm_data = np.where(is_smart_seq[d0], data / feature_length[d1], data)
row_sum = np.zeros((d0.max() + 1,), dtype=np.float64)
for i in range(len(d0)):
row_sum[d0[i]] += norm_data[i]

result = np.empty_like(data)
for i in range(len(d0)):
result[i] = norm_data[i] / row_sum[d0[i]]

return result


@numba.jit(nopython=True, nogil=True) # type: ignore[misc] # See https://github.com/numba/numba/issues/7424
def _roundHalfToEven(a: npt.NDArray[np.float32], keepbits: int) -> npt.NDArray[np.float32]:
"""
Generate reduced precision floating point array, with round half to even.
IMPORANT: In-place operation.

Ref: https://gmd.copernicus.org/articles/14/377/2021/gmd-14-377-2021.html
"""
assert a.dtype is np.dtype(np.float32) # code below assumes IEEE 754 float32
nmant = 23
bits = 32
if keepbits < 1 or keepbits >= nmant:
return a
maskbits = nmant - keepbits
full_mask = (1 << bits) - 1
mask = (full_mask >> maskbits) << maskbits
half_quantum1 = (1 << (maskbits - 1)) - 1

b = a.view(np.int32)
b += ((b >> maskbits) & 1) + half_quantum1
b &= mask
return a
Original file line number Diff line number Diff line change
Expand Up @@ -184,37 +184,12 @@
"tiledb": {
"create": {
"capacity": 2**16,
"dims": {
"soma_joinid": {
"filters": [
"DoubleDeltaFilter",
{"_type": "ZstdFilter", "level": 19},
]
}
},
"dims": {"soma_joinid": {"filters": ["DoubleDeltaFilter", {"_type": "ZstdFilter", "level": 19}]}},
"attrs": {
**{
k: {
"filters": [
{"_type": "ZstdFilter", "level": 19},
]
}
for k in _StringLabelVar
},
**{
k: {
"filters": [
"ByteShuffleFilter",
{"_type": "ZstdFilter", "level": 9},
]
}
for k in _NumericVar
},
**{k: {"filters": [{"_type": "ZstdFilter", "level": 19}]} for k in _StringLabelVar},
**{k: {"filters": ["ByteShuffleFilter", {"_type": "ZstdFilter", "level": 9}]} for k in _NumericVar},
},
"offsets_filters": [
"DoubleDeltaFilter",
{"_type": "ZstdFilter", "level": 19},
],
"offsets_filters": ["DoubleDeltaFilter", {"_type": "ZstdFilter", "level": 19}],
"allows_duplicates": True,
}
}
Expand All @@ -229,26 +204,10 @@
"create": {
"capacity": 2**16,
"dims": {
"soma_dim_0": {
"tile": 2048,
"filters": [{"_type": "ZstdFilter", "level": 5}],
},
"soma_dim_1": {
"tile": 2048,
"filters": [
"ByteShuffleFilter",
{"_type": "ZstdFilter", "level": 5},
],
},
},
"attrs": {
"soma_data": {
"filters": [
"ByteShuffleFilter",
{"_type": "ZstdFilter", "level": 5},
]
}
"soma_dim_0": {"tile": 2048, "filters": [{"_type": "ZstdFilter", "level": 5}]},
"soma_dim_1": {"tile": 2048, "filters": ["ByteShuffleFilter", {"_type": "ZstdFilter", "level": 5}]},
},
"attrs": {"soma_data": {"filters": ["ByteShuffleFilter", {"_type": "ZstdFilter", "level": 5}]}},
"cell_order": "row-major",
"tile_order": "row-major",
"allows_duplicates": True,
Expand All @@ -264,20 +223,7 @@
"tiledb": {
"create": {
**CENSUS_DEFAULT_X_LAYERS_PLATFORM_CONFIG["tiledb"]["create"],
"attrs": {
"soma_data": {
"filters": [
{
"_type": "FloatScaleFilter",
"factor": CENSUS_X_LAYER_NORMALIZED_FLOAT_SCALE_FACTOR,
"offset": 0.5,
"bytewidth": 4,
},
"ByteShuffleFilter",
{"_type": "ZstdFilter", "level": 5},
]
}
},
"attrs": {"soma_data": {"filters": [{"_type": "ZstdFilter", "level": 19}]}},
}
}
},
Expand Down Expand Up @@ -313,6 +259,13 @@
"EFO:0700016", # Smart-seq v4
]

# Smart-Seq has special handling in the "normalized" X layers
SMART_SEQ = [
"EFO:0008930", # Smart-seq
"EFO:0008931", # Smart-seq2
"EFO:0700016", # Smart-seq v4
]

DONOR_ID_IGNORE = ["pooled", "unknown"]

# Feature_reference values which are ignored (not considered) in
Expand Down
Loading
Loading