Skip to content

Commit

Permalink
DSP-23956: Migration of system.local/peers to flat files
Browse files Browse the repository at this point in the history
CC uses flat files to store system.local/peers info and exposes them via virtual tables.
DSE 5.1 uses regular C* sstables for storing these tables. This patch
adds the migration procedure that detects the existence of the mentioned flat
files and rewrites necessary data from sstables.
  • Loading branch information
szymon-miezal committed Oct 2, 2024
1 parent 1fd42c2 commit 6a4c377
Show file tree
Hide file tree
Showing 6 changed files with 865 additions and 11 deletions.
51 changes: 43 additions & 8 deletions src/java/org/apache/cassandra/nodes/Nodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ public class Nodes
private final Peers peers;
private final Local local;

public Peers getPeers()
{
return peers;
}

public Local getLocal()
{
return local;
}

/**
* Convenience method to retrieve the production code singleton.
*/
Expand Down Expand Up @@ -267,7 +277,8 @@ protected <R> R map(InetAddressAndPort peer, Function<NodeInfo, R> mapper)
return map(peer, mapper, () -> null);
}

private Nodes(Path storageDirectory)
@VisibleForTesting
public Nodes(Path storageDirectory)
{
this.updateExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("nodes-info-persistence"));
this.baseDirectory = storageDirectory;
Expand Down Expand Up @@ -417,7 +428,8 @@ private void maybeCleanupPendingTransaction(Path originalPath, Path backupPath,
}
}

private List<Path> listSnapshots()
@VisibleForTesting
public List<Path> listSnapshots()
{
if (!Files.isDirectory(snapshotsDirectory))
return Collections.emptyList();
Expand Down Expand Up @@ -451,9 +463,11 @@ public final class Local
private final Path localPath;
private final Path localBackupPath;
private final Path localTempPath;
private long lastLocalSave;

private volatile LocalInfo localInfo;
private volatile boolean closed;
private final boolean localInfoPresent;

public Local(ObjectMapper objectMapper, Path storageDirectory)
{
Expand All @@ -464,6 +478,8 @@ public Local(ObjectMapper objectMapper, Path storageDirectory)
localReader = objectMapper.readerFor(LocalInfo.class);
localWriter = objectMapper.writerFor(LocalInfo.class);

localInfoPresent = Files.exists(localPath);

try
{
localInfo = transactionalRead(localPath, localBackupPath, localTempPath, localReader::readValue, LocalInfo::new);
Expand Down Expand Up @@ -492,10 +508,17 @@ private synchronized void snapshot(Path snapshotPath) throws IOException

private void write(OutputStream output) throws IOException
{
lastLocalSave = System.nanoTime();

localInfo.resetDirty();
localWriter.writeValue(output, localInfo);
}

public long getLastSaveTimeNanos()
{
return lastLocalSave;
}

private void close()
{
closed = true;
Expand All @@ -513,6 +536,11 @@ private void resetUnsafe()
saveToDisk();
}

public boolean localInfoWasPresent()
{
return localInfoPresent;
}

/**
* Update information about the local node.
*
Expand All @@ -533,7 +561,7 @@ public void update(Consumer<LocalInfo> updater, boolean blocking)

if (local.isDirty())
{
Future future = updateExecutor.submit(() -> saveToDisk());
Future future = updateExecutor.submit(this::saveToDisk);
if (blocking)
FBUtilities.waitOnFuture(future);
}
Expand Down Expand Up @@ -562,7 +590,7 @@ public <V> void update(V value, BiConsumer<LocalInfo, V> updater, boolean blocki

if (local.isDirty())
{
Future future = updateExecutor.submit(() -> saveToDisk());
Future future = updateExecutor.submit(this::saveToDisk);
if (blocking)
FBUtilities.waitOnFuture(future);
}
Expand Down Expand Up @@ -623,7 +651,7 @@ public void updateTokens(Collection<Token> tokens)

public Collection<Token> getSavedTokens()
{
return localInfo.getTokens() == null ? Collections.EMPTY_LIST : localInfo.getTokens();
return localInfo.getTokens() == null ? Collections.emptyList() : localInfo.getTokens();
}

public int incrementAndGetGeneration()
Expand Down Expand Up @@ -668,6 +696,7 @@ public final class Peers
private final Path peersPath;
private final Path peersBackupPath;
private final Path peersTempPath;
private long lastPeersSave;

private ConcurrentMap<InetAddressAndPort, PeerInfo> peers;
private volatile boolean closed;
Expand Down Expand Up @@ -709,10 +738,16 @@ private synchronized void snapshot(Path snapshotPath) throws IOException

private void write(OutputStream output) throws IOException
{
lastPeersSave = System.nanoTime();
peers.values().forEach(NodeInfo::resetDirty);
peerWriter.writeValue(output, peers.values());
}

public long getLastSaveTimeNanos()
{
return lastPeersSave;
}

private void close()
{
closed = true;
Expand Down Expand Up @@ -753,7 +788,7 @@ public void update(InetAddressAndPort peer, Consumer<PeerInfo> updater)
PeerInfo peerInfo = peers.computeIfAbsent(peer, PeerInfo::new);
updater.accept(peerInfo);
if (peerInfo.isDirty())
updateExecutor.submit(() -> saveToDisk());
updateExecutor.submit(this::saveToDisk);
}

/**
Expand All @@ -772,7 +807,7 @@ public <V> void update(InetAddressAndPort peer, V value, BiConsumer<PeerInfo, V>
PeerInfo peerInfo = peers.computeIfAbsent(peer, PeerInfo::new);
updater.accept(peerInfo, value);
if (peerInfo.isDirty())
updateExecutor.submit(() -> saveToDisk());
updateExecutor.submit(this::saveToDisk);
}

/**
Expand All @@ -789,7 +824,7 @@ public void remove(InetAddressAndPort peer)
throw new IllegalStateException("Nodes instance already closed");

if (peers.remove(peer) != null)
updateExecutor.submit(() -> saveToDisk());
updateExecutor.submit(this::saveToDisk);
}

/**
Expand Down
Loading

0 comments on commit 6a4c377

Please sign in to comment.