Skip to content

Commit

Permalink
Allow custom SAI index component discovery
Browse files Browse the repository at this point in the history
Refactor the code that "discover" which SAI components files an SSTable
has to allow making that code customizable. The default behavior remains
the existing one, that is the TOC file is consulted, with a fallback to
scanning disk if the TOC file is missing or corrupted.

The way to customize this behavior is similar to what `SSTableWatcher`
does: a new system property, `-Dcassandra.sai.custom_components_discovery_class`
allows to load a specific class (that extends the `IndexCoomponentDiscovery` class)
instead of the default.

As part of the refactor to make this new customisability easier, this
introduces a new `ComponentsBuildId` class that is the pair of the
version and generation of a group of components, since this is what
ultimately defines a given component "group" build.
  • Loading branch information
pcmanus committed Oct 15, 2024
1 parent 06c31ee commit e19c101
Show file tree
Hide file tree
Showing 17 changed files with 366 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,10 @@ public enum CassandraRelevantProperties
* Allows custom implementation of {@link org.apache.cassandra.sensors.RequestSensorsFactory} to optionally create
* and configure {@link org.apache.cassandra.sensors.RequestSensors} instances.
*/
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class");
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class"),

/** Class used to discover/load the proper SAI index components file for a given sstable. */
CUSTOM_SAI_INDEX_COMPONENTS_DISCOVERY("cassandra.sai.custom_components_discovery_class");

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/index/sai/SSTableContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private SSTableContext(SSTableContext copy)
@SuppressWarnings("resource")
public static SSTableContext create(SSTableReader sstable, IndexComponents.ForRead perSSTableComponents)
{
var onDiskFormat = perSSTableComponents.version().onDiskFormat();
var onDiskFormat = perSSTableComponents.onDiskFormat();
PrimaryKey.Factory primaryKeyFactory = onDiskFormat.newPrimaryKeyFactory(sstable.metadata().comparator);

Ref<? extends SSTableReader> sstableRef = null;
Expand Down Expand Up @@ -141,7 +141,7 @@ public PrimaryKeyMap.Factory primaryKeyMapFactory()
*/
public int openFilesPerSSTable()
{
return perSSTableComponents.version().onDiskFormat().openFilesPerSSTable();
return perSSTableComponents.onDiskFormat().openFilesPerSSTable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static SSTableContext computeUpdatedContext(SSTableReader reader, @Nulla
// from a complete set, so if it is not complete, it means the previous components have been corrupted, and
// we want to use the new one (a rebuild)).
// 3. it uses "up-to-date" per-sstable components.
if (previousContext != null && previousContext.usedPerSSTableComponents().isComplete() && previousContext.usedPerSSTableComponents().hasSameVersionAndGenerationThan(perSSTableComponents))
if (previousContext != null && previousContext.usedPerSSTableComponents().isComplete() && previousContext.usedPerSSTableComponents().buildId().equals(perSSTableComponents.buildId()))
return previousContext;

// Now, if we create a new one, we should close the previous one if it exists.
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/index/sai/SSTableIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte
return new EmptyIndex();
}

return perIndexComponents.version().onDiskFormat().newSearchableIndex(sstableContext, perIndexComponents);
return perIndexComponents.onDiskFormat().newSearchableIndex(sstableContext, perIndexComponents);
}

public IndexContext getIndexContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.index.SecondaryIndexBuilder;
import org.apache.cassandra.index.sai.disk.StorageAttachedIndexWriter;
import org.apache.cassandra.index.sai.disk.format.ComponentsBuildId;
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
Expand Down Expand Up @@ -309,16 +310,16 @@ private static void prepareForRebuild(IndexComponents.ForRead components, Set<Co
// The current components are "replaced" (by "other" components) if the build create different components than
// the existing ones. This will happen in the following cases:
// 1. if we use immutable components, that's the point of immutable components.
// 2. when we do not use immutable components, there is still 2 cases where this will happen:
// 2. when we do not use immutable components, the rebuild components will always be for the latest version and
// for generation 0, so if the current components are not for that specific built, then we won't be rebuilding
// the exact same components, and we're "replacing", not "overwriting" ()
// a) the old components are from an older version: a new build will alawys be for `Version.latest()` and
// so will create new files in that case.
// b) the old components are from a non-0 generation: a new build will always be for generation 0 and so
// here again new files will be created. Note that "normally" we should not have non-0 generation in the
// so will create new files in that case (Note that "normally" we should not have non-0 generation in the
// first place if immutable components are not used, but we handle this case to better support "downgrades"
// where immutable components was enabled, but then disabled for some reason. If that happens, we still
// want to ensure a new build removes the old files both from disk (happens below) and from the sstable TOC
// (which is what `replacedComponents` is about).
if (components.version().useImmutableComponentFiles() || !components.version().equals(Version.latest()) || components.generation() != 0)
// (which is what `replacedComponents` is about)).
if (components.version().useImmutableComponentFiles() || !components.buildId().equals(ComponentsBuildId.forNewSSTable()))
replacedComponents.addAll(components.allAsCustomComponents());

if (!components.version().useImmutableComponentFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static RangeIterator create(SSTableContext ctx, AbstractBounds<PartitionP
TableMetadata metadata = ctx.sstable().metadata();
// if not row-aware, we don't have clustering
var perSSTableComponents = ctx.usedPerSSTableComponents();
if (perSSTableComponents.version().onDiskFormat().indexFeatureSet().isRowAware() && metadata.hasStaticColumns())
if (perSSTableComponents.onDiskFormat().indexFeatureSet().isRowAware() && metadata.hasStaticColumns())
filter = KeyFilter.KEYS_WITH_CLUSTERING;
else // the table doesn't consist anything we want to filter out, so let's use the cheap option
filter = KeyFilter.ALL;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.index.sai.disk.format;

import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.Nullable;

import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.index.sai.IndexContext;

/**
* Identifies a particular build of a per-sstable or per-index group of SAI index components, aka a pair of the
* {@link Version} built and the generation.
*/
public class ComponentsBuildId implements Comparable<ComponentsBuildId>
{
private static final ComponentsBuildId FOR_NEW_SSTABLE = ComponentsBuildId.latest(0);

private final Version version;
private final int generation;

private ComponentsBuildId(Version version, int generation)
{
this.version = version;
this.generation = generation;
}

public static ComponentsBuildId of(Version version, int generation)
{
return new ComponentsBuildId(version, generation);
}

public static ComponentsBuildId latest(int generation)
{
return of(Version.latest(), generation);
}

public static ComponentsBuildId forNewSSTable()
{
return FOR_NEW_SSTABLE;
}

public static ComponentsBuildId forNewBuild(@Nullable ComponentsBuildId previousBuild, Predicate<ComponentsBuildId> newBuildIsUsablePredicate)
{
Version version = Version.latest();
// If we're not using immutable components, we always use generation 0, and we're fine if that overrides existing files
if (!version.useImmutableComponentFiles())
return new ComponentsBuildId(version, 0);

// Otherwise, if there is no previous build or the new build is for a new version, then we can "tentatively"
// use generation 0, but if not, we need to bump the generation.
int generation = previousBuild != null && previousBuild.version.equals(version) ? previousBuild.generation + 1 : 0;
var candidate = new ComponentsBuildId(version, generation);

// Usually, the candidate above is fine, but we want to avoid overriding existing file (it's theoretically
// possible that the next generation was created at some other point, but then corrupted, and so we falled back
// on the previous generation but some of those file for the next generation still exists). So we check,
// repeatedly if that candidate is usable, incrementing the generation until we find one which is.
while (!newBuildIsUsablePredicate.test(candidate))
candidate = new ComponentsBuildId(version, ++generation);

return candidate;
}

public Version version()
{
return version;
}

public int generation()
{
return generation;
}

public String formatAsComponent(IndexComponentType indexComponentType, IndexContext indexContext)
{
return version.fileNameFormatter().format(indexComponentType, indexContext, generation);
}

@Override
public boolean equals(Object obj)
{
if (!(obj instanceof ComponentsBuildId))
return false;
ComponentsBuildId that = (ComponentsBuildId) obj;
return this.version.equals(that.version) && this.generation == that.generation;
}

@Override
public int hashCode()
{
return Objects.hash(version, generation);
}

@Override
public int compareTo(ComponentsBuildId that)
{
if (this.version.equals(that.version))
return Integer.compare(generation, that.generation);

return this.version.onOrAfter(that.version) ? 1 : -1;
}

@Override
public String toString()
{
return version + "@" + generation;
}

}
Loading

0 comments on commit e19c101

Please sign in to comment.