Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSP-23956: DSE 5.1 upgradability #1314

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/SerializationHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ public SerializationHeader toHeader(String descriptor, TableMetadata metadata, V
// deserialization. The column will be ignored later on anyway.
column = metadata.getDroppedColumn(name, isStatic);
if (column == null)
throw new UnknownColumnException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
throw new UnknownColumnException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization of " + metadata.keyspace + '.' + metadata.name);
bereng marked this conversation as resolved.
Show resolved Hide resolved
}
builder.add(column);
}
Expand Down
52 changes: 44 additions & 8 deletions src/java/org/apache/cassandra/nodes/Nodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ public class Nodes
private final Peers peers;
private final Local local;

public Peers getPeers()
{
return peers;
}

public Local getLocal()
{
return local;
}

/**
* Convenience method to retrieve the production code singleton.
*/
Expand Down Expand Up @@ -267,7 +277,8 @@ protected <R> R map(InetAddressAndPort peer, Function<NodeInfo, R> mapper)
return map(peer, mapper, () -> null);
}

private Nodes(Path storageDirectory)
@VisibleForTesting
public Nodes(Path storageDirectory)
{
this.updateExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("nodes-info-persistence"));
this.baseDirectory = storageDirectory;
Expand Down Expand Up @@ -417,7 +428,8 @@ private void maybeCleanupPendingTransaction(Path originalPath, Path backupPath,
}
}

private List<Path> listSnapshots()
@VisibleForTesting
public List<Path> listSnapshots()
{
if (!Files.isDirectory(snapshotsDirectory))
return Collections.emptyList();
Expand Down Expand Up @@ -451,9 +463,11 @@ public final class Local
private final Path localPath;
private final Path localBackupPath;
private final Path localTempPath;
private long lastLocalSave;

private volatile LocalInfo localInfo;
private volatile boolean closed;
private final boolean localInfoPresent;

public Local(ObjectMapper objectMapper, Path storageDirectory)
{
Expand All @@ -464,6 +478,8 @@ public Local(ObjectMapper objectMapper, Path storageDirectory)
localReader = objectMapper.readerFor(LocalInfo.class);
localWriter = objectMapper.writerFor(LocalInfo.class);

localInfoPresent = Files.exists(localPath);

try
{
localInfo = transactionalRead(localPath, localBackupPath, localTempPath, localReader::readValue, LocalInfo::new);
Expand Down Expand Up @@ -492,10 +508,17 @@ private synchronized void snapshot(Path snapshotPath) throws IOException

private void write(OutputStream output) throws IOException
{
lastLocalSave = System.nanoTime();
bereng marked this conversation as resolved.
Show resolved Hide resolved

localInfo.resetDirty();
localWriter.writeValue(output, localInfo);
}

public long getLastSaveTimeNanos()
{
return lastLocalSave;
}

private void close()
{
closed = true;
Expand All @@ -513,6 +536,11 @@ private void resetUnsafe()
saveToDisk();
}

public boolean localInfoWasPresent()
{
return localInfoPresent;
}

/**
* Update information about the local node.
*
Expand All @@ -533,7 +561,7 @@ public void update(Consumer<LocalInfo> updater, boolean blocking)

if (local.isDirty())
{
Future future = updateExecutor.submit(() -> saveToDisk());
Future future = updateExecutor.submit(this::saveToDisk);
if (blocking)
FBUtilities.waitOnFuture(future);
}
Expand Down Expand Up @@ -562,7 +590,7 @@ public <V> void update(V value, BiConsumer<LocalInfo, V> updater, boolean blocki

if (local.isDirty())
{
Future future = updateExecutor.submit(() -> saveToDisk());
Future future = updateExecutor.submit(this::saveToDisk);
if (blocking)
FBUtilities.waitOnFuture(future);
}
Expand Down Expand Up @@ -623,7 +651,7 @@ public void updateTokens(Collection<Token> tokens)

public Collection<Token> getSavedTokens()
{
return localInfo.getTokens() == null ? Collections.EMPTY_LIST : localInfo.getTokens();
return localInfo.getTokens() == null ? Collections.emptyList() : localInfo.getTokens();
}

public int incrementAndGetGeneration()
Expand Down Expand Up @@ -668,6 +696,7 @@ public final class Peers
private final Path peersPath;
private final Path peersBackupPath;
private final Path peersTempPath;
private long lastPeersSave;

private ConcurrentMap<InetAddressAndPort, PeerInfo> peers;
private volatile boolean closed;
Expand Down Expand Up @@ -709,10 +738,17 @@ private synchronized void snapshot(Path snapshotPath) throws IOException

private void write(OutputStream output) throws IOException
{
lastPeersSave = System.nanoTime();
peers.values().forEach(NodeInfo::resetDirty);
peerWriter.writeValue(output, peers.values());
}

@VisibleForTesting
public long getLastSaveTimeNanos()
szymon-miezal marked this conversation as resolved.
Show resolved Hide resolved
{
return lastPeersSave;
}

private void close()
{
closed = true;
Expand Down Expand Up @@ -753,7 +789,7 @@ public void update(InetAddressAndPort peer, Consumer<PeerInfo> updater)
PeerInfo peerInfo = peers.computeIfAbsent(peer, PeerInfo::new);
updater.accept(peerInfo);
if (peerInfo.isDirty())
updateExecutor.submit(() -> saveToDisk());
updateExecutor.submit(this::saveToDisk);
}

/**
Expand All @@ -772,7 +808,7 @@ public <V> void update(InetAddressAndPort peer, V value, BiConsumer<PeerInfo, V>
PeerInfo peerInfo = peers.computeIfAbsent(peer, PeerInfo::new);
updater.accept(peerInfo, value);
if (peerInfo.isDirty())
updateExecutor.submit(() -> saveToDisk());
updateExecutor.submit(this::saveToDisk);
}

/**
Expand All @@ -789,7 +825,7 @@ public void remove(InetAddressAndPort peer)
throw new IllegalStateException("Nodes instance already closed");

if (peers.remove(peer) != null)
updateExecutor.submit(() -> saveToDisk());
updateExecutor.submit(this::saveToDisk);
}

/**
Expand Down
Loading