Skip to content

Commit

Permalink
Revert primary permits changes and add hook (#120398)
Browse files Browse the repository at this point in the history
We would like to avoid using directly primary permits for hollow
shards. So we revert relevant changes, and add a hook into the
function that gets a primary permit with the purpose of a plugin
being able to extend the behavior.

Relates ES-10537
  • Loading branch information
kingherc authored Jan 20, 2025
1 parent bf92343 commit b3b059c
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
}
}
}

@Override
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
for (IndexEventListener listener : listeners) {
try {
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
} catch (Exception e) {
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;

import java.util.function.Supplier;

/**
* An index event listener is the primary extension point for plugins and build-in services
* to react / listen to per-index and per-shard events. These listeners are registered per-index
Expand Down Expand Up @@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
* @param indexShard the shard that is recovering
*/
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}

/**
* Called when a single primary permit is acquired for the given shard (see
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
*
* @param indexShard the shard of which a primary permit is requested
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
* completed in order for the permit to be given to the acquiring operation.
*/
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
}
118 changes: 32 additions & 86 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.replication.PendingReplicationActions;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -189,7 +190,6 @@
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {

Expand Down Expand Up @@ -779,28 +779,10 @@ public void relocated(
final String targetAllocationId,
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
final ActionListener<Void> listener
) throws IllegalIndexShardStateException, IllegalStateException {
relocated(targetNodeId, targetAllocationId, consumer, listener, null);
}

/**
* Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option
* to relocate the shard under externally acquired primary permits.
*
* @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately
*/
public void relocated(
final String targetNodeId,
final String targetAllocationId,
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
final ActionListener<Void> listener,
@Nullable final Releasable acquiredPrimaryPermits
) throws IllegalIndexShardStateException, IllegalStateException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "external primary permits are provided but not held by the shard";
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
ActionListener<Releasable> onAcquired = new ActionListener<>() {
indexShardOperationPermits.blockOperations(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
boolean success = false;
Expand Down Expand Up @@ -878,13 +860,8 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
};
if (acquiredPrimaryPermits == null) {
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE);
} else {
ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits);
}
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
// CancellableThreads and we want to be able to interrupt it
}
}

Expand Down Expand Up @@ -3592,100 +3569,69 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
);
}

/**
* Check to run before running the primary permit operation
*/
public enum PrimaryPermitCheck {
CHECK_PRIMARY_MODE,
/**
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
* Don't disable primary mode checks unless you're really sure.
*/
NONE
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution
) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution,
PrimaryPermitCheck primaryPermitCheck
) {
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
indexShardOperationPermits.acquire(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
executorOnDelay,
forceExecution
);

ActionListener<Releasable> onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> {
final ActionListener<Releasable> wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener(
delegate,
executorOnDelay,
forceExecution
);
try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) {
indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire());
}
});

indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution);
}

public boolean isPrimaryMode() {
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
return replicationTracker.isPrimaryMode();
}

public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
}

/**
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
* It is the responsibility of the caller to close the {@link Releasable}.
*/
public void acquireAllPrimaryOperationsPermits(
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout,
final PrimaryPermitCheck primaryPermitCheck
) {
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

asyncBlockOperations(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
timeout.duration(),
timeout.timeUnit()
);
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
}

/**
* Wraps the action to run on a primary after acquiring permit.
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
* executing the action.
*
* @param primaryPermitCheck check to run before the primary mode operation
* @param listener the listener to wrap
* @return the wrapped listener
*/
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
final PrimaryPermitCheck primaryPermitCheck,
final ActionListener<Releasable> listener
) {
return switch (primaryPermitCheck) {
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
case NONE -> listener;
};
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
return listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Expand Down Expand Up @@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
runnable.run();
}
}, onFailure);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
}

private <E extends Exception> void bumpPrimaryTerm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,32 +216,7 @@ private void innerAcquire(
try {
synchronized (this) {
if (queuedBlockOperations > 0) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired).delegateFailure(
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() {
listener.onResponse(r);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
})
);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
delayedOperations.add(wrappedListener);
delayedOperations.add(wrapContextPreservingActionListener(onAcquired, executorOnDelay, forceExecution));
return;
} else {
releasable = acquire();
Expand All @@ -255,6 +230,39 @@ public void onRejection(Exception e) {
onAcquired.onResponse(releasable);
}

public <T extends Closeable> ActionListener<T> wrapContextPreservingActionListener(
ActionListener<T> listener,
@Nullable final Executor executorOnDelay,
final boolean forceExecution
) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<T> wrappedListener;
if (executorOnDelay != null) {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener).delegateFailure(
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() {
listener.onResponse(r);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
})
);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener);
}
return wrappedListener;
}

private Releasable acquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,21 +790,6 @@ public void onFailure(final Exception e) {
}
}, TimeValue.timeValueSeconds(30));
latch.await();

// It's possible to acquire permits if we skip the primary mode check
var permitAcquiredLatch = new CountDownLatch(1);
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
r.close();
permitAcquiredLatch.countDown();
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
safeAwait(permitAcquiredLatch);

var allPermitsAcquiredLatch = new CountDownLatch(1);
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
r.close();
allPermitsAcquiredLatch.countDown();
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
safeAwait(allPermitsAcquiredLatch);
}

if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
Expand Down

0 comments on commit b3b059c

Please sign in to comment.