diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommand.java index 872c6ac0887..068567da1e7 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommand.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommand.java @@ -109,7 +109,7 @@ public List get(Catalog catalog) { CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone); - return List.of(new AlterZoneEntry(descriptor)); + return List.of(new AlterZoneEntry(descriptor, zone)); } private CatalogZoneDescriptor fromParamsAndPreviousValue(CatalogZoneDescriptor previous) { diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommand.java index c6c2748d1d0..5813fea5d5a 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommand.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommand.java @@ -85,7 +85,7 @@ public List get(Catalog catalog) { zone.consistencyMode() ); - return List.of(new AlterZoneEntry(descriptor)); + return List.of(new AlterZoneEntry(descriptor, zone)); } private void validate() { diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java index 7fe598fa195..4c1737aabcb 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java @@ -26,17 +26,26 @@ public class AlterZoneEventParameters extends CatalogEventParameters { private final CatalogZoneDescriptor zoneDescriptor; + private final CatalogZoneDescriptor previousDescriptor; + /** * Constructor. * * @param causalityToken Causality token. * @param catalogVersion Catalog version. * @param zoneDescriptor Newly created distribution zone descriptor. + * @param previousDescriptor Previous distribution zone descriptor. */ - public AlterZoneEventParameters(long causalityToken, int catalogVersion, CatalogZoneDescriptor zoneDescriptor) { + public AlterZoneEventParameters( + long causalityToken, + int catalogVersion, + CatalogZoneDescriptor zoneDescriptor, + CatalogZoneDescriptor previousDescriptor + ) { super(causalityToken, catalogVersion); this.zoneDescriptor = zoneDescriptor; + this.previousDescriptor = previousDescriptor; } /** @@ -45,4 +54,11 @@ public AlterZoneEventParameters(long causalityToken, int catalogVersion, Catalog public CatalogZoneDescriptor zoneDescriptor() { return zoneDescriptor; } + + /** + * Gets previous distribution zone descriptor. + */ + public CatalogZoneDescriptor previousDescriptor() { + return previousDescriptor; + } } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java index c9bbf229af3..9cf1f930cbc 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java @@ -40,13 +40,17 @@ public class AlterZoneEntry implements UpdateEntry, Fireable { private final CatalogZoneDescriptor descriptor; + private final CatalogZoneDescriptor previousDescriptor; + /** * Constructs the object. * * @param descriptor A descriptor of a zone to alter. + * @param previousDescriptor A previous descriptor of a zone. */ - public AlterZoneEntry(CatalogZoneDescriptor descriptor) { + public AlterZoneEntry(CatalogZoneDescriptor descriptor, CatalogZoneDescriptor previousDescriptor) { this.descriptor = descriptor; + this.previousDescriptor = previousDescriptor; } /** Returns descriptor of a zone to alter. */ @@ -54,6 +58,11 @@ public CatalogZoneDescriptor descriptor() { return descriptor; } + /** Returns previous descriptor of a zone. */ + public CatalogZoneDescriptor previousDescriptor() { + return previousDescriptor; + } + @Override public int typeId() { return MarshallableEntryType.ALTER_ZONE.id(); @@ -66,7 +75,7 @@ public CatalogEvent eventType() { @Override public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) { - return new AlterZoneEventParameters(causalityToken, catalogVersion, descriptor); + return new AlterZoneEventParameters(causalityToken, catalogVersion, descriptor, previousDescriptor); } @Override @@ -97,13 +106,15 @@ private static class AlterZoneEntrySerializer implements CatalogObjectSerializer @Override public AlterZoneEntry readFrom(IgniteDataInput input) throws IOException { CatalogZoneDescriptor descriptor = CatalogZoneDescriptor.SERIALIZER.readFrom(input); + CatalogZoneDescriptor previousDescriptor = CatalogZoneDescriptor.SERIALIZER.readFrom(input); - return new AlterZoneEntry(descriptor); + return new AlterZoneEntry(descriptor, previousDescriptor); } @Override public void writeTo(AlterZoneEntry object, IgniteDataOutput output) throws IOException { CatalogZoneDescriptor.SERIALIZER.writeTo(object.descriptor(), output); + CatalogZoneDescriptor.SERIALIZER.writeTo(object.previousDescriptor(), output); } } } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java index 1254fd2ae6d..e0d8b348532 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java @@ -255,7 +255,7 @@ public void testFunctionCallDefault() throws IOException { private void checkAlterZoneEntry() { CatalogStorageProfilesDescriptor profiles = new CatalogStorageProfilesDescriptor(List.of(new CatalogStorageProfileDescriptor("default"))); - UpdateEntry entry1 = new AlterZoneEntry(newCatalogZoneDescriptor("zone1", profiles)); + UpdateEntry entry1 = new AlterZoneEntry(newCatalogZoneDescriptor("zone1", profiles), newCatalogZoneDescriptor("zone0", profiles)); VersionedUpdate update = newVersionedUpdate(entry1, entry1); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/Pair.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/Pair.java new file mode 100644 index 00000000000..0cf9e0690dd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/Pair.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lang; + +import java.util.Objects; + +/** + * Pair of objects. + * + * @param First object. + * @param Second object. + */ +public class Pair { + /** First obj. */ + private final T first; + + /** Second obj. */ + private final V second; + + /** + * Constructor. + * + * @param first First object. + * @param second Second object. + */ + public Pair(T first, V second) { + this.first = first; + this.second = second; + } + + public static Pair pair(T first, V second) { + return new Pair<>(first, second); + } + + /** + * Get the first object. + */ + public T getFirst() { + return first; + } + + /** + * Get the second object. + */ + public V getSecond() { + return second; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Pair pair = (Pair) o; + return Objects.equals(first, pair.first) && Objects.equals(second, pair.second); + } + + @Override + public int hashCode() { + return Objects.hash(first, second); + } + + @Override + public String toString() { + return "Pair [" + first + ", " + second + ']'; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java index 93d449e2700..32e99bb6c9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java @@ -29,7 +29,11 @@ import java.time.LocalTime; import java.time.Period; import java.util.BitSet; +import java.util.Collection; +import java.util.Map; import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; /** * Extended data input. @@ -418,4 +422,58 @@ interface Materializer { */ T materialize(byte[] buffer, int offset, int length); } + + /** + * Reads a collection. + * + * @param collectionSupplier Supplier to create a collection. + * @param elementReader Function to read an element. + * @return Collection. + */ + default > C readCollection( + Supplier collectionSupplier, + ObjectReader elementReader + ) throws IOException { + int size = readVarIntAsInt(); + + C collection = collectionSupplier.get(); + + for (int i = 0; i < size; i++) { + collection.add(elementReader.read(this)); + } + + return collection; + } + + /** + * Reads a map. + * + * @param mapSupplier Supplier to create a map. + * @param keyReader Function to read a key. + * @param valueReader Function to read a value. + * @return Map. + */ + default > M readMap( + Supplier mapSupplier, + ObjectReader keyReader, + ObjectReader valueReader + ) throws IOException { + int size = readVarIntAsInt(); + + M map = mapSupplier.get(); + + for (int i = 0; i < size; i++) { + K key = keyReader.read(this); + VV value = valueReader.read(this); + + map.put(key, value); + } + + return map; + } + + @FunctionalInterface + interface ObjectReader { + T read(IgniteDataInput in) throws IOException; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java index befdacf0367..ff2069f0405 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java @@ -29,7 +29,10 @@ import java.time.LocalTime; import java.time.Period; import java.util.BitSet; +import java.util.Collection; +import java.util.Map; import java.util.UUID; +import java.util.function.BiConsumer; /** * Extended data output. @@ -234,4 +237,40 @@ public interface IgniteDataOutput extends DataOutput { * @throws IOException if something went wrong */ void flush() throws IOException; + + /** + * Writes a collection. + * + * @param collection Collection. + * @param elementWriter Element writer. + */ + default void writeCollection(Collection collection, ObjectWriter elementWriter) throws IOException { + writeVarInt(collection.size()); + + for (T e : collection) { + elementWriter.write(e, this); + } + } + + /** + * Writes a map. + * + * @param map Map. + * @param keyWriter Key writer. + * @param valWriter Value writer. + */ + default void writeMap(Map map, ObjectWriter keyWriter, ObjectWriter valWriter) + throws IOException { + writeVarInt(map.size()); + + for (Map.Entry e : map.entrySet()) { + keyWriter.write(e.getKey(), this); + valWriter.write(e.getValue(), this); + } + } + + @FunctionalInterface + interface ObjectWriter { + void write(T obj, IgniteDataOutput out) throws IOException; + } } diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java index f51ea315297..ef84d450b97 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -38,7 +38,6 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKeyPrefix; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesRecoverableStateRevision; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultChannelTypeRegistry; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; @@ -74,7 +73,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.stream.Stream; import org.apache.ignite.configuration.validation.Validator; import org.apache.ignite.internal.BaseIgniteRestartTest; @@ -127,7 +125,6 @@ import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.testframework.TestIgnitionManager; -import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.worker.fixtures.NoOpCriticalWorkerRegistry; import org.apache.ignite.network.NetworkAddress; @@ -290,16 +287,12 @@ private PartialNode startPartialNode(int idx) { var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), new TestClockService(clock, clockWaiter)); - ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - NamedThreadFactory.create(name, "test-rebalance-scheduler", logger())); - DistributionZoneManager distributionZoneManager = new DistributionZoneManager( name, revisionUpdater, metastore, logicalTopologyService, catalogManager, - rebalanceScheduler, clusterCfgMgr.configurationRegistry().getConfiguration(SystemDistributedExtensionConfiguration.KEY).system() ); diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 383308447c4..41989ae59c8 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1462,7 +1462,6 @@ private class Node { metaStorageManager, logicalTopologyService, catalogManager, - rebalanceScheduler, systemDistributedConfiguration ); diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java new file mode 100644 index 00000000000..e4846ffea2a --- /dev/null +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java @@ -0,0 +1,674 @@ +package org.apache.ignite.internal.distributionzones; + +import static java.lang.Math.max; +import static java.util.Collections.emptySet; +import static java.util.Optional.ofNullable; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonePartitionResetTimerKey; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey; +import static org.apache.ignite.internal.lang.Pair.pair; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.and; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.value; +import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; +import static org.apache.ignite.internal.metastorage.dsl.Operations.put; +import static org.apache.ignite.internal.metastorage.dsl.Statements.iif; +import static org.apache.ignite.internal.util.CollectionUtils.union; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.lang.Pair; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.dsl.Condition; +import org.apache.ignite.internal.metastorage.dsl.Iif; +import org.apache.ignite.internal.metastorage.dsl.Operation; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor; +import org.apache.ignite.internal.util.io.IgniteDataInput; +import org.apache.ignite.internal.util.io.IgniteDataOutput; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.apache.ignite.internal.versioned.VersionedSerializer; +import org.jetbrains.annotations.Nullable; + +/** + * Manager for data nodes of distribution zones. + */ +public class DataNodesManager { + /** The logger. */ + private static final IgniteLogger LOG = Loggers.forClass(DataNodesManager.class); + + private static final DistributionZoneTimer DEFAULT_TIMER = new DistributionZoneTimer(HybridTimestamp.MIN_VALUE, Set.of()); + + private final MetaStorageManager metaStorageManager; + + /** + * Map with zone id as a key and set of zone timers as a value. + */ + private final Map zoneTimers = new ConcurrentHashMap<>(); + + /** Executor for scheduling tasks for scale up and scale down processes. */ + private final StripedScheduledThreadPoolExecutor executor; + + public DataNodesManager(String nodeName, MetaStorageManager metaStorageManager) { + this.metaStorageManager = metaStorageManager; + + executor = createZoneManagerExecutor( + Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), + NamedThreadFactory.create(nodeName, "dst-zones-scheduler", LOG) + ); + } + + void start( + Collection knownZones, + Set logicalTopology + ) { + for (CatalogZoneDescriptor zone : knownZones) { + onScaleUpTimerChange(zone, logicalTopology); + onScaleDownTimerChange(zone, logicalTopology); + } + } + + void stop() { + zoneTimers.forEach((k, zt) -> zt.stopAllTimers()); + + shutdownAndAwaitTermination(executor, 10, SECONDS); + } + + public CompletableFuture onTopologyChangeZoneHandler( + CatalogZoneDescriptor zoneDescriptor, + HybridTimestamp timestamp, + Set oldLogicalTopology, + Set newLogicalTopology + ) { + return msInvokeWithRetry(() -> { + int zoneId = zoneDescriptor.id(); + + DataNodesHistory dataNodesHistory = dataNodesHistory(zoneId); + + if (dataNodesHistory.history.containsKey(timestamp)) { + return null; + } + + DistributionZoneTimer scaleUpTimer = scaleUpTimer(zoneId); + DistributionZoneTimer scaleDownTimer = scaleDownTimer(zoneId); + + Pair> currentDataNodes = currentDataNodes( + timestamp, + dataNodesHistory, + oldLogicalTopology, + scaleUpTimer, + scaleDownTimer, + zoneDescriptor + ); + + Set filteredNewTopology = filterDataNodes(newLogicalTopology, zoneDescriptor); + + Set addedNodes = filteredNewTopology.stream() + .filter(node -> !currentDataNodes.getSecond().contains(node)) + .collect(toSet()); + + Set removedNodes = currentDataNodes.getSecond().stream() + .filter(node -> !newLogicalTopology.contains(node)) + .collect(toSet()); + + if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && zoneDescriptor.dataNodesAutoAdjust() != INFINITE_TIMER_VALUE) { + // TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer. + throw new UnsupportedOperationException("Data nodes auto adjust is not supported."); + } + + // TODO deal with partition distribution reset + + DistributionZoneTimer newScaleUpTimer = new DistributionZoneTimer( + timestamp.addPhysicalTime(zoneDescriptor.dataNodesAutoAdjustScaleUp() * 1000L), + union(addedNodes, scaleUpTimer.timestamp.longValue() < timestamp.longValue() ? emptySet() : scaleUpTimer.nodes) + ); + + DistributionZoneTimer newScaleDownTimer = new DistributionZoneTimer( + timestamp.addPhysicalTime(zoneDescriptor.dataNodesAutoAdjustScaleDown() * 1000L), + union(removedNodes, scaleDownTimer.timestamp.longValue() < timestamp.longValue() ? emptySet() : scaleDownTimer.nodes) + ); + + return iif( + and( + dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), + and( + timerEqualToOrNotExists(zoneScaleUpTimerKey(zoneId), scaleUpTimer), + timerEqualToOrNotExists(zoneScaleDownTimerKey(zoneId), scaleDownTimer) + ) + ), + ops( + addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, timestamp, currentDataNodes.getSecond()), + renewTimer(zoneScaleUpTimerKey(zoneId), newScaleUpTimer), + renewTimer(zoneScaleDownTimerKey(zoneId), newScaleDownTimer) + ).yield(true), + ops().yield(false) + ); + }); + } + + public CompletableFuture onZoneFilterChange( + CatalogZoneDescriptor zoneDescriptor, + HybridTimestamp timestamp, + Set logicalTopology + ) { + return msInvokeWithRetry(() -> { + int zoneId = zoneDescriptor.id(); + + DataNodesHistory dataNodesHistory = dataNodesHistory(zoneId); + + if (dataNodesHistory.history.containsKey(timestamp)) { + return null; + } + + Set dataNodes = filterDataNodes(logicalTopology, zoneDescriptor); + + return iif( + dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), + ops( + addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, timestamp, dataNodes), + clearTimer(zoneScaleUpTimerKey(zoneId)), + clearTimer(zoneScaleDownTimerKey(zoneId)), + clearTimer(zonePartitionResetTimerKey(zoneId)) + ).yield(true), + ops().yield(false) + ); + }); + } + + public CompletableFuture onAutoAdjustAlteration( + CatalogZoneDescriptor zoneDescriptor, + HybridTimestamp timestamp, + int oldAutoAdjustScaleUp, + int oldAutoAdjustScaleDown, + Set logicalTopology + ) { + return msInvokeWithRetry(() -> { + int zoneId = zoneDescriptor.id(); + + DataNodesHistory dataNodesHistory = dataNodesHistory(zoneId); + + if (dataNodesHistory.history.containsKey(timestamp)) { + return null; + } + + DistributionZoneTimer scaleUpTimer = scaleUpTimer(zoneId); + DistributionZoneTimer scaleDownTimer = scaleDownTimer(zoneId); + + DistributionZoneTimer newScaleUpTimer = new DistributionZoneTimer( + timestamp + .subtractPhysicalTime(oldAutoAdjustScaleUp * 1000L) + .addPhysicalTime(zoneDescriptor.dataNodesAutoAdjustScaleUp() * 1000L), + scaleUpTimer.nodes + ); + + DistributionZoneTimer newScaleDownTimer = new DistributionZoneTimer( + timestamp + .subtractPhysicalTime(oldAutoAdjustScaleDown * 1000L) + .addPhysicalTime(zoneDescriptor.dataNodesAutoAdjustScaleDown() * 1000L), + scaleUpTimer.nodes + ); + + Pair> currentDataNodes = currentDataNodes( + timestamp, + dataNodesHistory, + logicalTopology, + newScaleUpTimer, + newScaleDownTimer, + zoneDescriptor + ); + + return iif( + and( + dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), + and( + timerEqualToOrNotExists(zoneScaleUpTimerKey(zoneId), scaleUpTimer), + timerEqualToOrNotExists(zoneScaleDownTimerKey(zoneId), scaleDownTimer) + ) + ), + ops( + addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, timestamp, currentDataNodes.getSecond()), + renewTimerOrClearIfAlreadyApplied(zoneScaleUpTimerKey(zoneId), timestamp, newScaleUpTimer), + renewTimerOrClearIfAlreadyApplied(zoneScaleDownTimerKey(zoneId), timestamp, newScaleDownTimer) + ).yield(true), + ops().yield(false) + ); + }); + } + + public void onUpdatePartitionDistributionReset( + int zoneId, + int partitionDistributionResetTimeoutSeconds, + Runnable task + ) { + if (partitionDistributionResetTimeoutSeconds == INFINITE_TIMER_VALUE) { + zoneTimers.computeIfAbsent(zoneId, ZoneTimers::new).partitionReset.stopScheduledTask(); + } else { + zoneTimers.computeIfAbsent(zoneId, ZoneTimers::new).partitionReset.reschedule(partitionDistributionResetTimeoutSeconds, task); + } + } + + private CompletableFuture onScaleUpTimerChange( + CatalogZoneDescriptor zoneDescriptor, + Set logicalTopology + ) { + int zoneId = zoneDescriptor.id(); + + DistributionZoneTimer scaleUpTimer = scaleUpTimer(zoneId); + + Runnable runnable = () -> msInvokeWithRetry(() -> { + DataNodesHistory dataNodesHistory = dataNodesHistory(zoneId); + + if (dataNodesHistory.history.containsKey(scaleUpTimer.timestamp)) { + return null; + } + + DistributionZoneTimer scaleUpTimer0 = scaleUpTimer(zoneId); + + Pair> currentDataNodes = currentDataNodes( + scaleUpTimer.timestamp, + dataNodesHistory, + logicalTopology, + scaleUpTimer, + DEFAULT_TIMER, + zoneDescriptor + ); + + return iif( + and( + dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), + timerEqualToOrNotExists(zoneScaleUpTimerKey(zoneId), scaleUpTimer0) + ), + ops( + addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, scaleUpTimer.timestamp, currentDataNodes.getSecond()), + clearTimer(zoneScaleUpTimerKey(zoneId)) + ).yield(true), + ops().yield(false) + ); + }); + + rescheduleScaleUp(delayInSeconds(scaleUpTimer.timestamp), runnable, zoneId); + + return completedFuture(null); + } + + private void onScaleDownTimerChange( + CatalogZoneDescriptor zoneDescriptor, + Set logicalTopology + ) { + int zoneId = zoneDescriptor.id(); + + DistributionZoneTimer scaleDownTimer = scaleDownTimer(zoneId); + + Runnable runnable = () -> msInvokeWithRetry(() -> { + DataNodesHistory dataNodesHistory = dataNodesHistory(zoneId); + + if (dataNodesHistory.history.containsKey(scaleDownTimer.timestamp)) { + return null; + } + + DistributionZoneTimer scaleDownTimer0 = scaleDownTimer(zoneId); + + Pair> currentDataNodes = currentDataNodes( + scaleDownTimer.timestamp, + dataNodesHistory, + logicalTopology, + DEFAULT_TIMER, + scaleDownTimer, + zoneDescriptor + ); + + return iif( + and( + dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), + timerEqualToOrNotExists(zoneScaleDownTimerKey(zoneId), scaleDownTimer0) + ), + ops( + addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, scaleDownTimer.timestamp, currentDataNodes.getSecond()), + clearTimer(zoneScaleDownTimerKey(zoneId)) + ).yield(true), + ops().yield(false) + ); + }); + + rescheduleScaleDown(delayInSeconds(scaleDownTimer.timestamp), runnable, zoneId); + } + + private void onPartitionResetTimerChange( + CatalogZoneDescriptor zoneDescriptor, + Set logicalTopology + ) { + // TODO + } + + private long delayInSeconds(HybridTimestamp timerTimestamp) { + long currentTime = System.currentTimeMillis(); + + long delayMs = timerTimestamp.getPhysical() - currentTime; + + return max(0, delayMs / 1000); + } + + private void rescheduleScaleUp(long delayInSeconds, Runnable runnable, int zoneId) { + zoneTimers.computeIfAbsent(zoneId, ZoneTimers::new).scaleUp.reschedule(delayInSeconds, runnable); + } + + private void rescheduleScaleDown(long delayInSeconds, Runnable runnable, int zoneId) { + zoneTimers.computeIfAbsent(zoneId, ZoneTimers::new).scaleDown.reschedule(delayInSeconds, runnable); + } + + private void reschedulePartitionReset(long delayInSeconds, Runnable runnable, int zoneId) { + zoneTimers.computeIfAbsent(zoneId, ZoneTimers::new).partitionReset.reschedule(delayInSeconds, runnable); + } + + private static Pair> currentDataNodes( + HybridTimestamp timestamp, + @Nullable DataNodesHistory dataNodesHistory, + Set oldLogicalTopology, + DistributionZoneTimer scaleUpTimer, + DistributionZoneTimer scaleDownTimer, + CatalogZoneDescriptor zoneDescriptor + ) { + Map.Entry> currentDataNodesEntry = dataNodesHistory.history.floorEntry(timestamp); + + Set currentDataNodes; + HybridTimestamp currentDataNodesTimestamp; + + if (currentDataNodesEntry == null) { + currentDataNodesTimestamp = HybridTimestamp.MIN_VALUE; + currentDataNodes = filterDataNodes(oldLogicalTopology, zoneDescriptor); + } else { + currentDataNodesTimestamp = currentDataNodesEntry.getKey(); + currentDataNodes = currentDataNodesEntry.getValue(); + } + + long cdnt = currentDataNodesEntry == null ? HybridTimestamp.MIN_VALUE.longValue() : currentDataNodesEntry.getKey().longValue(); + long sutt = scaleUpTimer.timestamp.longValue(); + long sdtt = scaleDownTimer.timestamp.longValue(); + long timestampLong = timestamp.longValue(); + + if (sutt > cdnt && sutt <= timestampLong) { + currentDataNodesTimestamp = scaleUpTimer.timestamp; + currentDataNodes.addAll(filterDataNodes(scaleUpTimer.nodes, zoneDescriptor)); + } + + if (sdtt > cdnt && sdtt > sutt && sdtt <= timestampLong) { + currentDataNodesTimestamp = scaleDownTimer.timestamp; + currentDataNodes.removeAll(scaleDownTimer.nodes); + } + + return pair(currentDataNodesTimestamp, currentDataNodes); + } + + public CompletableFuture> dataNodes(int zoneId, HybridTimestamp timestamp) { + return getValueFromMetaStorage(zoneDataNodesHistoryKey(zoneId), DataNodesHistorySerializer::deserialize) + .thenApply(history -> { + Map.Entry> entry = history.history.floorEntry(timestamp); + + if (entry == null) { + return emptySet(); + } + + return entry.getValue().stream().map(NodeWithAttributes::nodeName).collect(toSet()); + }); + } + + private DataNodesHistory dataNodesHistory(int zoneId) { + return ofNullable(getValueFromMetaStorageLocally(zoneDataNodesHistoryKey(zoneId), DataNodesHistorySerializer::deserialize)) + .orElse(new DataNodesHistory(new TreeMap<>())); + } + + private DistributionZoneTimer scaleUpTimer(int zoneId) { + return ofNullable(getValueFromMetaStorageLocally(zoneScaleUpTimerKey(zoneId), DistributionZoneTimerSerializer::deserialize)) + .orElse(DEFAULT_TIMER); + } + + private DistributionZoneTimer scaleDownTimer(int zoneId) { + return ofNullable(getValueFromMetaStorageLocally(zoneScaleDownTimerKey(zoneId), DistributionZoneTimerSerializer::deserialize)) + .orElse(DEFAULT_TIMER); + } + + private Condition dataNodesHistoryEqualToOrNotExists(int zoneId, DataNodesHistory history) { + return or( + notExists(zoneDataNodesHistoryKey(zoneId)), + value(zoneDataNodesHistoryKey(zoneId)).eq(DataNodesHistorySerializer.serialize(history)) + ); + } + + private Condition timerEqualToOrNotExists(ByteArray timerKey, DistributionZoneTimer timer) { + return or( + notExists(timerKey), + or( + value(timerKey).eq(DistributionZoneTimerSerializer.serialize(timer)), + value(timerKey).eq(DistributionZoneTimerSerializer.serialize(DEFAULT_TIMER)) + ) + ); + } + + public static Operation addNewEntryToDataNodesHistory( + int zoneId, + DataNodesHistory history, + HybridTimestamp timestamp, + Set nodes + ) { + DataNodesHistory newHistory = new DataNodesHistory(new TreeMap<>(history.history)); + newHistory.history.put(timestamp, nodes); + + return put(zoneDataNodesHistoryKey(zoneId), DataNodesHistorySerializer.serialize(newHistory)); + } + + private static Operation renewTimer(ByteArray timerKey, DistributionZoneTimer timer) { + return put(timerKey, DistributionZoneTimerSerializer.serialize(timer)); + } + + private static Operation renewTimerOrClearIfAlreadyApplied(ByteArray timerKey, HybridTimestamp timestamp, DistributionZoneTimer timer) { + DistributionZoneTimer newValue = timer.timestamp.longValue() > timestamp.longValue() ? timer : DEFAULT_TIMER; + + return renewTimer(timerKey, newValue); + } + + public static Operation clearTimer(ByteArray timerKey) { + return put(timerKey, DistributionZoneTimerSerializer.serialize(DEFAULT_TIMER)); + } + + @Nullable + private T getValueFromMetaStorageLocally(ByteArray key, Function deserializer) { + return deserializeEntry(metaStorageManager.getLocally(key), deserializer); + } + + private CompletableFuture getValueFromMetaStorage(ByteArray key, Function deserializer) { + return metaStorageManager.get(key).thenApply(e -> deserializeEntry(e, deserializer)); + } + + @Nullable + private static T deserializeEntry(@Nullable Entry e, Function deserializer) { + if (e == null || e.value() == null || e.empty() || e.tombstone()) { + return null; + } else { + return deserializer.apply(e.value()); + } + } + + private CompletableFuture msInvokeWithRetry(Supplier iifSupplier) { + Iif iif = iifSupplier.get(); + + if (iif == null) { + return completedFuture(null); + } + + return metaStorageManager.invoke(iif) + .handle((v, e) -> { + if (e == null) { + if (v.getAsBoolean()) { + return null; + } else { + return msInvokeWithRetry(iifSupplier); + } + } else { + LOG.warn("Failed to perform meta storage invoke", e); + + return null; + } + }) + .thenCompose(Function.identity()); + } + + void onZoneDrop(int zoneId, HybridTimestamp timestamp) { + ZoneTimers zt = zoneTimers.remove(zoneId); + if (zt != null) { + zt.stopAllTimers(); + } + } + + public static class DataNodesHistory { + final NavigableMap> history; + + public DataNodesHistory() { + this(new TreeMap<>()); + } + + private DataNodesHistory(NavigableMap> history) { + this.history = history; + } + } + + private static class DataNodesHistorySerializer extends VersionedSerializer { + private static final DataNodesHistorySerializer INSTANCE = new DataNodesHistorySerializer(); + + @Override + protected void writeExternalData(DataNodesHistory object, IgniteDataOutput out) throws IOException { + out.writeMap( + object.history, + (k, out0) -> out0.writeLong(k.longValue()), + (v, out0) -> out0.writeCollection(v, NodeWithAttributesSerializer.INSTANCE::writeExternal) + ); + } + + @Override + protected DataNodesHistory readExternalData(byte protoVer, IgniteDataInput in) throws IOException { + NavigableMap> history = in.readMap( + TreeMap::new, + in0 -> HybridTimestamp.hybridTimestamp(in0.readLong()), + in0 -> in0.readCollection( + HashSet::new, + NodeWithAttributesSerializer.INSTANCE::readExternal + ) + ); + + return new DataNodesHistory(history); + } + + static byte[] serialize(DataNodesHistory dataNodesHistory) { + return VersionedSerialization.toBytes(dataNodesHistory, INSTANCE); + } + + static DataNodesHistory deserialize(byte[] bytes) { + return VersionedSerialization.fromBytes(bytes, INSTANCE); + } + } + + private static class DistributionZoneTimer { + final HybridTimestamp timestamp; + + final Set nodes; + + private DistributionZoneTimer(HybridTimestamp timestamp, Set nodes) { + this.timestamp = timestamp; + this.nodes = nodes; + } + } + + private static class DistributionZoneTimerSerializer extends VersionedSerializer { + /** Serializer instance. */ + private static final DistributionZoneTimerSerializer INSTANCE = new DistributionZoneTimerSerializer(); + + @Override + protected void writeExternalData(DistributionZoneTimer object, IgniteDataOutput out) throws IOException { + out.writeLong(object.timestamp.longValue()); + out.writeCollection(object.nodes, NodeWithAttributesSerializer.INSTANCE::writeExternal); + } + + @Override + protected DistributionZoneTimer readExternalData(byte protoVer, IgniteDataInput in) throws IOException { + HybridTimestamp timestamp = HybridTimestamp.hybridTimestamp(in.readLong()); + Set nodes = in.readCollection(HashSet::new, NodeWithAttributesSerializer.INSTANCE::readExternal); + + return new DistributionZoneTimer(timestamp, nodes); + } + + static byte[] serialize(DistributionZoneTimer timer) { + return VersionedSerialization.toBytes(timer, INSTANCE); + } + + static DistributionZoneTimer deserialize(byte[] bytes) { + return VersionedSerialization.fromBytes(bytes, INSTANCE); + } + } + + private class ZoneTimerSchedule { + final int zoneId; + ScheduledFuture taskFuture; + long delay; + + ZoneTimerSchedule(int zoneId) { + this.zoneId = zoneId; + } + + synchronized void reschedule(long delayInSeconds, Runnable task) { + stopScheduledTask(); + + taskFuture = executor.schedule(task, delayInSeconds, SECONDS); + delay = delayInSeconds; + } + + synchronized void stopScheduledTask() { + if (taskFuture != null && delay > 0) { + taskFuture.cancel(false); + + delay = 0; + } + } + } + + private class ZoneTimers { + final ZoneTimerSchedule scaleUp; + final ZoneTimerSchedule scaleDown; + final ZoneTimerSchedule partitionReset; + + ZoneTimers(int zoneId) { + this.scaleUp = new ZoneTimerSchedule(zoneId); + this.scaleDown = new ZoneTimerSchedule(zoneId); + this.partitionReset = new ZoneTimerSchedule(zoneId); + } + + void stopAllTimers() { + scaleUp.stopScheduledTask(); + scaleDown.stopScheduledTask(); + partitionReset.stopScheduledTask(); + } + } +} diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index c7ddb558287..2526e5a416d 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -25,33 +25,25 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; import static org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.HIGH_AVAILABILITY; import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER; import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE; import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_DROP; +import static org.apache.ignite.internal.distributionzones.DataNodesManager.addNewEntryToDataNodesHistory; +import static org.apache.ignite.internal.distributionzones.DataNodesManager.clearTimer; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForRecoverableStateChanges; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneCreation; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneRemoval; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndTriggerKeys; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerScaleUpScaleDownKeysCondition; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndScaleDownTriggerKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndScaleUpTriggerKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKeys; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersionAndClusterId; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentation; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonePartitionResetTimerKey; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyClusterIdKey; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey; @@ -59,10 +51,14 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesNodesAttributes; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesRecoverableStateRevision; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.and; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.notTombstone; import static org.apache.ignite.internal.metastorage.dsl.Conditions.value; import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; +import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; import static org.apache.ignite.internal.metastorage.dsl.Statements.iif; import static org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder; import static org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder; @@ -76,7 +72,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -86,7 +81,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.catalog.CatalogManager; @@ -102,16 +96,15 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.configuration.SystemDistributedConfiguration; import org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder; -import org.apache.ignite.internal.distributionzones.causalitydatanodes.CausalityDataNodesEngine; +import org.apache.ignite.internal.distributionzones.DataNodesManager.DataNodesHistory; import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent; import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams; import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; import org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine; import org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener; import org.apache.ignite.internal.event.AbstractEventProducer; -import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -122,11 +115,9 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.Revisions; import org.apache.ignite.internal.metastorage.WatchListener; -import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.dsl.Operation; -import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.metastorage.dsl.StatementResult; import org.apache.ignite.internal.metastorage.dsl.Update; import org.apache.ignite.internal.metastorage.exceptions.CompactedException; @@ -158,29 +149,10 @@ public class DistributionZoneManager extends /** Executor for scheduling tasks for scale up and scale down processes. */ private final StripedScheduledThreadPoolExecutor executor; - /** - * Map with states for distribution zones. States are needed to track nodes that we want to add or remove from the data nodes, - * schedule and stop scale up and scale down processes. - */ - private final Map zonesState = new ConcurrentHashMap<>(); + private DataNodesManager dataNodesManager; /** Listener for a topology events. */ - private final LogicalTopologyEventListener topologyEventListener = new LogicalTopologyEventListener() { - @Override - public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) { - updateLogicalTopologyInMetaStorage(newTopology); - } - - @Override - public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { - updateLogicalTopologyInMetaStorage(newTopology); - } - - @Override - public void onTopologyLeap(LogicalTopologySnapshot newTopology) { - updateLogicalTopologyInMetaStorage(newTopology); - } - }; + private final LogicalTopologyEventListener topologyEventListener = new DistributionZoneManagerLogicalTopologyEventListener(); /** * The logical topology mapped to the MS revision. @@ -203,15 +175,9 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) { /** Rebalance engine. */ private final DistributionZoneRebalanceEngine rebalanceEngine; - /** Causality data nodes engine. */ - private final CausalityDataNodesEngine causalityDataNodesEngine; - /** Catalog manager. */ private final CatalogManager catalogManager; - /** Executor for scheduling rebalances. */ - private final ScheduledExecutorService rebalanceScheduler; - /** Configuration of HA mode. */ private final SystemDistributedConfigurationPropertyHolder partitionDistributionResetTimeoutConfiguration; @@ -231,7 +197,6 @@ public DistributionZoneManager( MetaStorageManager metaStorageManager, LogicalTopologyService logicalTopologyService, CatalogManager catalogManager, - ScheduledExecutorService rebalanceScheduler, SystemDistributedConfiguration systemDistributedConfiguration ) { this.metaStorageManager = metaStorageManager; @@ -240,8 +205,6 @@ public DistributionZoneManager( this.topologyWatchListener = createMetastorageTopologyListener(); - this.rebalanceScheduler = rebalanceScheduler; - executor = createZoneManagerExecutor( Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), NamedThreadFactory.create(nodeName, "dst-zones-scheduler", LOG) @@ -257,16 +220,6 @@ public DistributionZoneManager( catalogManager ); - //noinspection ThisEscapedInObjectConstruction - causalityDataNodesEngine = new CausalityDataNodesEngine( - busyLock, - registry, - metaStorageManager, - zonesState, - this, - catalogManager - ); - partitionDistributionResetTimeoutConfiguration = new SystemDistributedConfigurationPropertyHolder<>( systemDistributedConfiguration, this::onUpdatePartitionDistributionResetBusy, @@ -305,7 +258,6 @@ public CompletableFuture startAsync(ComponentContext componentContext) { int catalogVersion = catalogManager.latestCatalogVersion(); return allOf( - createOrRestoreZonesStates(recoveryRevision, catalogVersion), restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision, catalogVersion) ).thenComposeAsync((notUsed) -> rebalanceEngine.startAsync(catalogVersion), componentContext.executor()); }); @@ -319,7 +271,7 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { busyLock.block(); - zonesState.values().forEach(ZoneState::stopTimers); + dataNodesManager.stop(); rebalanceEngine.stop(); @@ -328,7 +280,6 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { metaStorageManager.unregisterWatch(topologyWatchListener); shutdownAndAwaitTermination(executor, 10, SECONDS); - shutdownAndAwaitTermination(rebalanceScheduler, 10, SECONDS); return nullCompletedFuture(); } @@ -352,11 +303,31 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { * @return The future with data nodes for the zoneId. */ public CompletableFuture> dataNodes(long causalityToken, int catalogVersion, int zoneId) { - return causalityDataNodesEngine.dataNodes(causalityToken, catalogVersion, zoneId); + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(causalityToken); + + return dataNodesManager.dataNodes(zoneId, timestamp); } private CompletableFuture onUpdateScaleUpBusy(AlterZoneEventParameters parameters) { - int zoneId = parameters.zoneDescriptor().id(); + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(parameters.causalityToken()); + Entry topologyEntry = metaStorageManager.getLocally(zonesLogicalTopologyKey(), parameters.causalityToken()); + + if (topologyEntry != null) { + Set logicalTopology = deserializeLogicalTopologySet(topologyEntry.value()); + + return dataNodesManager.onAutoAdjustAlteration( + parameters.zoneDescriptor(), + timestamp, + parameters.previousDescriptor().dataNodesAutoAdjustScaleUp(), + parameters.previousDescriptor().dataNodesAutoAdjustScaleDown(), + logicalTopology + ); + } + + // TODO ??? + return nullCompletedFuture(); + + /*int zoneId = parameters.zoneDescriptor().id(); int newScaleUp = parameters.zoneDescriptor().dataNodesAutoAdjustScaleUp(); @@ -388,7 +359,7 @@ private CompletableFuture onUpdateScaleUpBusy(AlterZoneEventParameters par zoneState.stopScaleUp(); } - return nullCompletedFuture(); + return nullCompletedFuture();*/ } private void onUpdatePartitionDistributionResetBusy( @@ -413,40 +384,46 @@ private void onUpdatePartitionDistributionResetBusy( return; } + int catalogVersion = catalogManager.activeCatalogVersion(updateTimestamp); + // It is safe to zoneState.entrySet in term of ConcurrentModification and etc. because meta storage notifications are one-threaded // and this map will be initialized on a manager start or with catalog notification or with distribution configuration changes. - for (Map.Entry zoneStateEntry : zonesState.entrySet()) { - int zoneId = zoneStateEntry.getKey(); - CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneId, updateTimestamp); + for (CatalogZoneDescriptor zoneDescriptor : catalogManager.zones(catalogVersion)) { + int zoneId = zoneDescriptor.id(); - if (zoneDescriptor == null || zoneDescriptor.consistencyMode() != HIGH_AVAILABILITY) { + if (zoneDescriptor.consistencyMode() != HIGH_AVAILABILITY) { continue; } - ZoneState zoneState = zoneStateEntry.getValue(); - - if (partitionDistributionResetTimeoutSeconds != INFINITE_TIMER_VALUE) { - Optional highestRevision = zoneState.highestRevision(); - - assert highestRevision.isEmpty() || causalityToken >= highestRevision.get() : IgniteStringFormatter.format( - "Expected causalityToken that is greater or equal to already seen meta storage events: highestRevision={}, " - + "causalityToken={}", - highestRevision.orElse(null), causalityToken - ); - - zoneState.reschedulePartitionDistributionReset( - partitionDistributionResetTimeoutSeconds, - () -> fireTopologyReduceLocalEvent(causalityToken, zoneId), - zoneId - ); - } else { - zoneState.stopPartitionDistributionReset(); - } + dataNodesManager.onUpdatePartitionDistributionReset( + zoneId, + partitionDistributionResetTimeoutSeconds, + () -> fireTopologyReduceLocalEvent(causalityToken, zoneId) + ); } } private CompletableFuture onUpdateScaleDownBusy(AlterZoneEventParameters parameters) { - int zoneId = parameters.zoneDescriptor().id(); + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(parameters.causalityToken()); + Entry topologyEntry = metaStorageManager.getLocally(zonesLogicalTopologyKey(), parameters.causalityToken()); + + if (topologyEntry != null) { + Set logicalTopology = deserializeLogicalTopologySet(topologyEntry.value()); + + return dataNodesManager.onAutoAdjustAlteration( + parameters.zoneDescriptor(), + timestamp, + parameters.previousDescriptor().dataNodesAutoAdjustScaleUp(), + parameters.previousDescriptor().dataNodesAutoAdjustScaleDown(), + logicalTopology + ); + } + + // TODO ??? + return nullCompletedFuture(); + + + /*int zoneId = parameters.zoneDescriptor().id(); int newScaleDown = parameters.zoneDescriptor().dataNodesAutoAdjustScaleDown(); @@ -478,110 +455,30 @@ private CompletableFuture onUpdateScaleDownBusy(AlterZoneEventParameters p zoneState.stopScaleDown(); } - return nullCompletedFuture(); + return nullCompletedFuture();*/ } private CompletableFuture onUpdateFilter(AlterZoneEventParameters parameters) { - int zoneId = parameters.zoneDescriptor().id(); - - long causalityToken = parameters.causalityToken(); - - return saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken); - } - - /** - * Restores zones' states. - * - * @param zone Zone descriptor. - * @param causalityToken Causality token. - * @return Future reflecting the completion of creation or restoring a zone. - */ - private CompletableFuture restoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) { - int zoneId = zone.id(); - - Entry zoneDataNodesLocalMetaStorage = metaStorageManager.getLocally(zoneDataNodesKey(zoneId), causalityToken); - - if (zoneDataNodesLocalMetaStorage.value() == null) { - // In this case, creation of a zone was interrupted during restart. - return onCreateZone(zone, causalityToken); - } else { - Entry topologyAugmentationMapLocalMetaStorage = metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken); - - ConcurrentSkipListMap topologyAugmentationMap; - - if (topologyAugmentationMapLocalMetaStorage.value() == null) { - // This case means that there won't any logical topology updates before restart. - topologyAugmentationMap = new ConcurrentSkipListMap<>(); - } else { - topologyAugmentationMap = TopologyAugmentationMapSerializer.deserialize(topologyAugmentationMapLocalMetaStorage.value()); - } + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(parameters.causalityToken()); - ZoneState zoneState = new ZoneState(executor, topologyAugmentationMap); + Entry topologyEntry = metaStorageManager.getLocally(zonesLogicalTopologyKey(), parameters.causalityToken()); - ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, zoneState); + if (topologyEntry != null) { + Set logicalTopology = deserializeLogicalTopologySet(topologyEntry.value()); - assert prevZoneState == null : "Zone's state was created twice [zoneId = " + zoneId + ']'; + dataNodesManager.onZoneFilterChange(parameters.zoneDescriptor(), timestamp, logicalTopology); } + /*int zoneId = parameters.zoneDescriptor().id(); - return nullCompletedFuture(); - } - - private CompletableFuture onCreateZone(CatalogZoneDescriptor zone, long causalityToken) { - int zoneId = zone.id(); - - ConcurrentSkipListMap topologyAugmentationMap = new ConcurrentSkipListMap<>(); - - ZoneState zoneState = new ZoneState(executor, topologyAugmentationMap); - - ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, zoneState); - - assert prevZoneState == null : "Zone's state was created twice [zoneId = " + zoneId + ']'; - - Set dataNodes = logicalTopology(causalityToken).stream().map(NodeWithAttributes::node).collect(toSet()); - - causalityDataNodesEngine.onCreateZoneState(causalityToken, zone); + long causalityToken = parameters.causalityToken(); - return initDataNodesAndTriggerKeysInMetaStorage(zoneId, causalityToken, dataNodes); + return saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken);*/ } - /** - * Restores timers that were scheduled before a node's restart. Take the highest revision from the - * {@link ZoneState#topologyAugmentationMap()}, schedule scale up/scale down timers. - * - * @param catalogVersion Catalog version. - * @return Future that represents the pending completion of the operation. - * For the immediate timers it will be completed when data nodes will be updated in Meta Storage. - */ - private CompletableFuture restoreTimers(int catalogVersion) { - List> futures = new ArrayList<>(); - - for (CatalogZoneDescriptor zone : catalogManager.zones(catalogVersion)) { - ZoneState zoneState = zonesState.get(zone.id()); - - // Max revision from the {@link ZoneState#topologyAugmentationMap()} for node joins. - Optional maxScaleUpRevisionOptional = zoneState.highestRevision(true); - - // Max revision from the {@link ZoneState#topologyAugmentationMap()} for node removals. - Optional maxScaleDownRevisionOptional = zoneState.highestRevision(false); - - maxScaleUpRevisionOptional.ifPresent( - maxScaleUpRevision -> { - // Take the highest revision from the topologyAugmentationMap and schedule scale up/scale down, - // meaning that all augmentations of nodes will be taken into account in newly created timers. - // If augmentations have already been proposed to data nodes in the metastorage before restart, - // that means we have updated corresponding trigger key and it's value will be greater or equal to - // the highest revision from the topologyAugmentationMap, and current timer won't affect data nodes. - - futures.add(scheduleTimers(zone, true, false, maxScaleUpRevision)); - } - ); - - maxScaleDownRevisionOptional.ifPresent( - maxScaleDownRevision -> futures.add(scheduleTimers(zone, false, true, maxScaleDownRevision)) - ); - } + private CompletableFuture onCreateZone(CatalogZoneDescriptor zone, long causalityToken) { + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(causalityToken); - return allOf(futures.toArray(CompletableFuture[]::new)); + return initDataNodesKeysInMetaStorage(zone.id(), timestamp, filterDataNodes(logicalTopology(causalityToken), zone)); } /** @@ -590,14 +487,14 @@ private CompletableFuture restoreTimers(int catalogVersion) { * if it passes the condition. It is called on the first creation of a zone. * * @param zoneId Unique id of a zone - * @param revision Revision of an event that has triggered this method. + * @param timestamp Timestamp of an event that has triggered this method. * @param dataNodes Data nodes. * @return Future reflecting the completion of initialisation of zone's keys in meta storage. */ - private CompletableFuture initDataNodesAndTriggerKeysInMetaStorage( + private CompletableFuture initDataNodesKeysInMetaStorage( int zoneId, - long revision, - Set dataNodes + HybridTimestamp timestamp, + Set dataNodes ) { if (!busyLock.enterBusy()) { throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); @@ -605,43 +502,47 @@ private CompletableFuture initDataNodesAndTriggerKeysInMetaStorage( try { // Update data nodes for a zone only if the corresponding data nodes keys weren't initialised in ms yet. - CompoundCondition triggerKeyCondition = conditionForZoneCreation(zoneId); - - Update dataNodesAndTriggerKeyUpd = updateDataNodesAndTriggerKeys( - zoneId, - revision, - DataNodesMapSerializer.serialize(toDataNodesMap(dataNodes)) + Condition condition = and( + notExists(zoneDataNodesHistoryKey(zoneId)), + notTombstone(zoneDataNodesHistoryKey(zoneId)) ); - Iif iif = iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, ops().yield(false)); + Update update = ops( + addNewEntryToDataNodesHistory(zoneId, new DataNodesHistory(), timestamp, dataNodes), + clearTimer(zoneScaleUpTimerKey(zoneId)), + clearTimer(zoneScaleDownTimerKey(zoneId)), + clearTimer(zonePartitionResetTimerKey(zoneId)) + ).yield(true); + + Iif iif = iif(condition, update, ops().yield(false)); return metaStorageManager.invoke(iif) .thenApply(StatementResult::getAsBoolean) .whenComplete((invokeResult, e) -> { if (e != null) { LOG.error( - "Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]", + "Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, timestamp = {}]", e, zoneId, dataNodes, - revision + timestamp ); } else if (invokeResult) { - LOG.info("Update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]", + LOG.info("Update zones' dataNodes value [zoneId = {}, dataNodes = {}, timestamp = {}]", zoneId, dataNodes, - revision + timestamp ); } else { LOG.debug( - "Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]", + "Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, timestamp = {}]", zoneId, dataNodes, - revision + timestamp ); } }).thenCompose((ignored) -> nullCompletedFuture()); - } finally { + } finally { busyLock.leaveBusy(); } } @@ -650,34 +551,39 @@ private CompletableFuture initDataNodesAndTriggerKeysInMetaStorage( * Method deletes data nodes value for the specified zone. * * @param zoneId Unique id of a zone - * @param revision Revision of an event that has triggered this method. + * @param timestamp Timestamp of an event that has triggered this method. */ - private CompletableFuture removeTriggerKeysAndDataNodes(int zoneId, long revision) { + private CompletableFuture removeDataNodesKeys(int zoneId, HybridTimestamp timestamp) { if (!busyLock.enterBusy()) { throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); } try { - SimpleCondition triggerKeyCondition = conditionForZoneRemoval(zoneId); + Condition condition = exists(zoneDataNodesHistoryKey(zoneId)); - Update removeKeysUpd = deleteDataNodesAndTriggerKeys(zoneId, revision); + Update removeKeysUpd = ops( + remove(zoneDataNodesHistoryKey(zoneId)), + remove(zoneScaleUpTimerKey(zoneId)), + remove(zoneScaleDownTimerKey(zoneId)), + remove(zonePartitionResetTimerKey(zoneId)) + ).yield(true); - Iif iif = iif(triggerKeyCondition, removeKeysUpd, ops().yield(false)); + Iif iif = iif(condition, removeKeysUpd, ops().yield(false)); return metaStorageManager.invoke(iif) .thenApply(StatementResult::getAsBoolean) .whenComplete((invokeResult, e) -> { if (e != null) { LOG.error( - "Failed to delete zone's dataNodes keys [zoneId = {}, revision = {}]", + "Failed to delete zone's dataNodes keys [zoneId = {}, timestamp = {}]", e, zoneId, - revision + timestamp ); } else if (invokeResult) { - LOG.info("Delete zone's dataNodes keys [zoneId = {}, revision = {}]", zoneId, revision); + LOG.info("Delete zone's dataNodes keys [zoneId = {}, timestamp = {}]", zoneId, timestamp); } else { - LOG.debug("Failed to delete zone's dataNodes keys [zoneId = {}, revision = {}]", zoneId, revision); + LOG.debug("Failed to delete zone's dataNodes keys [zoneId = {}, timestamp = {}]", zoneId, timestamp); } }) .thenCompose(ignored -> nullCompletedFuture()); @@ -794,28 +700,30 @@ private WatchListener createMetastorageTopologyListener() { byte[] newLogicalTopologyBytes; Set newLogicalTopology = null; + Set oldLogicalTopology = emptySet(); - long revision = 0; + HybridTimestamp timestamp = evt.timestamp(); for (EntryEvent event : evt.entryEvents()) { Entry e = event.newEntry(); - if (Arrays.equals(e.key(), zonesLogicalTopologyVersionKey().bytes())) { - revision = e.revision(); - } else if (Arrays.equals(e.key(), zonesLogicalTopologyKey().bytes())) { + if (Arrays.equals(e.key(), zonesLogicalTopologyKey().bytes())) { newLogicalTopologyBytes = e.value(); + assert newLogicalTopologyBytes != null : "New topology is null."; + newLogicalTopology = deserializeLogicalTopologySet(newLogicalTopologyBytes); + + Entry oldEntry = event.oldEntry(); + if (oldEntry != null && oldEntry.value() != null && !oldEntry.empty() && !oldEntry.tombstone()) { + oldLogicalTopology = deserializeLogicalTopologySet(oldEntry.value()); + } } } assert newLogicalTopology != null : "The event doesn't contain logical topology"; - assert revision > 0 : "The event doesn't contain logical topology version"; - - // It is safe to get the latest version of the catalog as we are in the metastore thread. - int catalogVersion = catalogManager.latestCatalogVersion(); - return onLogicalTopologyUpdate(newLogicalTopology, revision, catalogVersion); + return onLogicalTopologyUpdate(newLogicalTopology, oldLogicalTopology, timestamp); } finally { busyLock.leaveBusy(); } @@ -833,22 +741,24 @@ private static String entryKeyAsString(EntryEvent entry) { * Note that all futures of Meta Storage updates that happen in this method are returned from this method. * * @param newLogicalTopology New logical topology. - * @param revision Revision of the logical topology update. - * @param catalogVersion Actual version of the Catalog. + * @param oldLogicalTopology Old logical topology. + * @param timestamp Event timestamp. * @return Future reflecting the completion of the actions needed when logical topology was updated. */ - private CompletableFuture onLogicalTopologyUpdate(Set newLogicalTopology, long revision, int catalogVersion) { - Set currentLogicalTopology = logicalTopology(revision); - + private CompletableFuture onLogicalTopologyUpdate( + Set newLogicalTopology, + Set oldLogicalTopology, + HybridTimestamp timestamp + ) { Set removedNodes = - currentLogicalTopology.stream() + oldLogicalTopology.stream() .filter(node -> !newLogicalTopology.contains(node)) .map(NodeWithAttributes::node) .collect(toSet()); Set addedNodes = newLogicalTopology.stream() - .filter(node -> !currentLogicalTopology.contains(node)) + .filter(node -> !oldLogicalTopology.contains(node)) .map(NodeWithAttributes::node) .collect(toSet()); @@ -856,43 +766,28 @@ private CompletableFuture onLogicalTopologyUpdate(Set List> futures = new ArrayList<>(); - logicalTopologyByRevision.put(revision, newLogicalTopology); + int catalogVersion = catalogManager.activeCatalogVersion(timestamp.longValue()); for (CatalogZoneDescriptor zone : catalogManager.zones(catalogVersion)) { - int zoneId = zone.id(); - - updateLocalTopologyAugmentationMap(addedNodes, removedNodes, revision, zoneId); + CompletableFuture f = dataNodesManager.onTopologyChangeZoneHandler( + zone, + timestamp, + oldLogicalTopology, + newLogicalTopology + ); - futures.add(scheduleTimers(zone, !addedNodes.isEmpty(), !removedNodes.isEmpty(), revision)); + futures.add(f); zoneIds.add(zone.id()); } newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n)); - futures.add(saveRecoverableStateToMetastorage(zoneIds, revision, newLogicalTopology)); + //futures.add(saveRecoverableStateToMetastorage(zoneIds, revision, newLogicalTopology)); return allOf(futures.toArray(CompletableFuture[]::new)); } - /** - * Update local topology augmentation map with newly added and removed nodes. - * - * @param addedNodes Nodes that was added to a topology and should be added to zones data nodes. - * @param removedNodes Nodes that was removed from a topology and should be removed from zones data nodes. - * @param revision Revision of the event that triggered this method. - * @param zoneId Zone's id. - */ - private void updateLocalTopologyAugmentationMap(Set addedNodes, Set removedNodes, long revision, int zoneId) { - if (!addedNodes.isEmpty()) { - zonesState.get(zoneId).nodesToAddToDataNodes(addedNodes, revision); - } - - if (!removedNodes.isEmpty()) { - zonesState.get(zoneId).nodesToRemoveFromDataNodes(removedNodes, revision); - } - } - /** * Saves recoverable state of the Distribution Zone Manager to Meta Storage atomically in one batch. * After restart it could be used to restore these fields. @@ -907,6 +802,7 @@ private CompletableFuture saveRecoverableStateToMetastorage( long revision, Set newLogicalTopology ) { + // TODO Operation[] puts = new Operation[3 + zoneIds.size()]; puts[0] = put(zonesNodesAttributes(), NodesAttributesSerializer.serialize(nodesAttributes())); @@ -920,15 +816,6 @@ private CompletableFuture saveRecoverableStateToMetastorage( int i = 3; - // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise topology augmentation map. Also this map - // TODO: can be saved only once for all zones. - for (Integer zoneId : zoneIds) { - puts[i++] = put( - zoneTopologyAugmentation(zoneId), - TopologyAugmentationMapSerializer.serialize(zonesState.get(zoneId).topologyAugmentationMap()) - ); - } - Iif iif = iif( conditionForRecoverableStateChanges(revision), ops(puts).yield(true), @@ -954,15 +841,20 @@ private CompletableFuture saveRecoverableStateToMetastorage( * @param zone Zone descriptor. * @param nodesAdded Flag indicating that nodes was added to a topology and should be added to zones data nodes. * @param nodesRemoved Flag indicating that nodes was removed from a topology and should be removed from zones data nodes. - * @param revision Revision that triggered that event. + * @param timestamp Timestamp. * @return Future that represents the pending completion of the operation. * For the immediate timers it will be completed when data nodes will be updated in Meta Storage. */ - private CompletableFuture scheduleTimers(CatalogZoneDescriptor zone, boolean nodesAdded, boolean nodesRemoved, long revision) { + private CompletableFuture scheduleTimers( + CatalogZoneDescriptor zone, + boolean nodesAdded, + boolean nodesRemoved, + HybridTimestamp timestamp + ) { int autoAdjust = zone.dataNodesAutoAdjust(); int autoAdjustScaleDown = zone.dataNodesAutoAdjustScaleDown(); int autoAdjustScaleUp = zone.dataNodesAutoAdjustScaleUp(); - int partitionReset = partitionDistributionResetTimeoutConfiguration.currentValue(); + int partitionResetDelay = partitionDistributionResetTimeoutConfiguration.currentValue(); int zoneId = zone.id(); @@ -973,40 +865,20 @@ private CompletableFuture scheduleTimers(CatalogZoneDescriptor zone, boole throw new UnsupportedOperationException("Data nodes auto adjust is not supported."); } else { if (nodesAdded) { - if (autoAdjustScaleUp == IMMEDIATE_TIMER_VALUE) { - futures.add(saveDataNodesToMetaStorageOnScaleUp(zoneId, revision)); - } - if (autoAdjustScaleUp != INFINITE_TIMER_VALUE) { - zonesState.get(zoneId).rescheduleScaleUp( - autoAdjustScaleUp, - () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, revision), - zoneId - ); + } } if (nodesRemoved) { if (zone.consistencyMode() == HIGH_AVAILABILITY) { - if (partitionReset != INFINITE_TIMER_VALUE) { - zonesState.get(zoneId).reschedulePartitionDistributionReset( - partitionReset, - () -> fireTopologyReduceLocalEvent(revision, zoneId), - zoneId - ); - } - } + if (partitionResetDelay != INFINITE_TIMER_VALUE) { - if (autoAdjustScaleDown == IMMEDIATE_TIMER_VALUE) { - futures.add(saveDataNodesToMetaStorageOnScaleDown(zoneId, revision)); + } } if (autoAdjustScaleDown != INFINITE_TIMER_VALUE) { - zonesState.get(zoneId).rescheduleScaleDown( - autoAdjustScaleDown, - () -> saveDataNodesToMetaStorageOnScaleDown(zoneId, revision), - zoneId - ); + } } } @@ -1045,229 +917,6 @@ private void fireTopologyReduceLocalEvent(long revision, int zoneId) { }); } - /** - * Method updates data nodes value for the specified zone after scale up timer timeout, sets {@code revision} to the - * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition. - * - * @param zoneId Unique id of a zone - * @param revision Revision of an event that has triggered this method. - */ - CompletableFuture saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) { - if (!busyLock.enterBusy()) { - throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); - } - - try { - ZoneState zoneState = zonesState.get(zoneId); - - if (zoneState == null) { - // Zone was deleted - return nullCompletedFuture(); - } - - Set keysToGetFromMs = Set.of( - zoneDataNodesKey(zoneId), - zoneScaleUpChangeTriggerKey(zoneId), - zoneScaleDownChangeTriggerKey(zoneId) - ); - - return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> inBusyLock(busyLock, () -> { - if (values.containsValue(null)) { - // Zone was deleted - return nullCompletedFuture(); - } - - Map dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId))); - - long scaleUpTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleUpChangeTriggerKey(zoneId))); - - long scaleDownTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleDownChangeTriggerKey(zoneId))); - - if (revision <= scaleUpTriggerRevision) { - LOG.debug( - "Revision of the event is less than the scale up revision from the metastorage " - + "[zoneId = {}, revision = {}, scaleUpTriggerRevision = {}]", - zoneId, - revision, - scaleUpTriggerRevision - ); - - return nullCompletedFuture(); - } - - List deltaToAdd = zoneState.nodesToBeAddedToDataNodes(scaleUpTriggerRevision, revision); - - Map newDataNodes = new HashMap<>(dataNodesFromMetaStorage); - - deltaToAdd.forEach(n -> newDataNodes.merge(n, 1, Integer::sum)); - - // Update dataNodes, so nodeId will be updated with the latest seen data on the node. - // For example, node could be restarted with new nodeId, we need to update it in the data nodes. - deltaToAdd.forEach(n -> newDataNodes.put(n, newDataNodes.remove(n))); - - // Remove redundant nodes that are not presented in the data nodes. - newDataNodes.entrySet().removeIf(e -> e.getValue() == 0); - - Update dataNodesAndTriggerKeyUpd = updateDataNodesAndScaleUpTriggerKey( - zoneId, - revision, - DataNodesMapSerializer.serialize(newDataNodes) - ); - - Iif iif = iif( - triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, scaleDownTriggerRevision, zoneId), - dataNodesAndTriggerKeyUpd, - ops().yield(false) - ); - - return metaStorageManager.invoke(iif) - .thenApply(StatementResult::getAsBoolean) - .thenCompose(invokeResult -> inBusyLock(busyLock, () -> { - if (invokeResult) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map - // Currently we call clean up only on a node that successfully writes data nodes. - LOG.info( - "Updating data nodes for a zone after scale up has succeeded " - + "[zoneId = {}, dataNodes = {}, revision = {}]", - zoneId, - newDataNodes, - revision - ); - - zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision)); - } else { - LOG.debug("Updating data nodes for a zone after scale up has not succeeded " - + "[zoneId = {}, dataNodes = {}, revision = {}]", - zoneId, - newDataNodes, - revision - ); - - return saveDataNodesToMetaStorageOnScaleUp(zoneId, revision); - } - - return nullCompletedFuture(); - })); - })).whenComplete((v, e) -> { - if (e != null) { - LOG.warn("Failed to update zones' dataNodes value after scale up [zoneId = {}, revision = {}]", - e, zoneId, revision); - } - }); - } finally { - busyLock.leaveBusy(); - } - } - - /** - * Method updates data nodes value for the specified zone after scale down timer timeout, sets {@code revision} to the - * {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} if it passes the condition. - * - * @param zoneId Unique id of a zone - * @param revision Revision of an event that has triggered this method. - */ - CompletableFuture saveDataNodesToMetaStorageOnScaleDown(int zoneId, long revision) { - if (!busyLock.enterBusy()) { - throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); - } - - try { - ZoneState zoneState = zonesState.get(zoneId); - - if (zoneState == null) { - // Zone was deleted - return nullCompletedFuture(); - } - - Set keysToGetFromMs = Set.of( - zoneDataNodesKey(zoneId), - zoneScaleUpChangeTriggerKey(zoneId), - zoneScaleDownChangeTriggerKey(zoneId) - ); - - return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> inBusyLock(busyLock, () -> { - if (values.containsValue(null)) { - // Zone was deleted - return nullCompletedFuture(); - } - - Map dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId))); - - long scaleUpTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleUpChangeTriggerKey(zoneId))); - - long scaleDownTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleDownChangeTriggerKey(zoneId))); - - if (revision <= scaleDownTriggerRevision) { - LOG.debug( - "Revision of the event is less than the scale down revision from the metastorage " - + "[zoneId = {}, revision = {}, scaleUpTriggerRevision = {}]", - zoneId, - revision, - scaleDownTriggerRevision - ); - - return nullCompletedFuture(); - } - - List deltaToRemove = zoneState.nodesToBeRemovedFromDataNodes(scaleDownTriggerRevision, revision); - - Map newDataNodes = new HashMap<>(dataNodesFromMetaStorage); - - deltaToRemove.forEach(n -> newDataNodes.merge(n, -1, Integer::sum)); - - // Remove redundant nodes that are not presented in the data nodes. - newDataNodes.entrySet().removeIf(e -> e.getValue() == 0); - - Update dataNodesAndTriggerKeyUpd = updateDataNodesAndScaleDownTriggerKey( - zoneId, - revision, - DataNodesMapSerializer.serialize(newDataNodes) - ); - - Iif iif = iif( - triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, scaleDownTriggerRevision, zoneId), - dataNodesAndTriggerKeyUpd, - ops().yield(false) - ); - - return metaStorageManager.invoke(iif) - .thenApply(StatementResult::getAsBoolean) - .thenCompose(invokeResult -> inBusyLock(busyLock, () -> { - if (invokeResult) { - LOG.info( - "Updating data nodes for a zone after scale down has succeeded " - + "[zoneId = {}, dataNodes = {}, revision = {}]", - zoneId, - newDataNodes, - revision - ); - - // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map - // Currently we call clean up only on a node that successfully writes data nodes. - zoneState.cleanUp(Math.min(scaleUpTriggerRevision, revision)); - } else { - LOG.debug("Updating data nodes for a zone after scale down has not succeeded " - + "[zoneId = {}, dataNodes = {}, revision = {}]", - zoneId, - newDataNodes, - revision - ); - - return saveDataNodesToMetaStorageOnScaleDown(zoneId, revision); - } - - return nullCompletedFuture(); - })); - })).whenComplete((v, e) -> { - if (e != null) { - LOG.warn("Failed to update zones' dataNodes value after scale down [zoneId = {}]", e, zoneId); - } - }); - } finally { - busyLock.leaveBusy(); - } - } - /** * Class responsible for storing state for a distribution zone. * States are needed to track nodes that we want to add or remove from the data nodes, @@ -1583,7 +1232,7 @@ public Map nodesAttributes() { @TestOnly public Map zonesState() { - return zonesState; + // TODO remove this } public Set logicalTopology() { @@ -1617,17 +1266,6 @@ private void registerCatalogEventListenersOnStartManagerBusy() { catalogManager.listen(ZONE_ALTER, new ManagerCatalogAlterZoneEventListener()); } - private CompletableFuture createOrRestoreZonesStates(long recoveryRevision, int catalogVersion) { - List> futures = new ArrayList<>(); - - // TODO: IGNITE-20287 Clean up abandoned resources for dropped tables from vault and metastore - for (CatalogZoneDescriptor zone : catalogManager.zones(catalogVersion)) { - futures.add(restoreZoneStateBusy(zone, recoveryRevision)); - } - - return allOf(futures.toArray(CompletableFuture[]::new)); - } - /** * Restore the event of the updating the logical topology from Meta Storage, that has not been completed before restart. * Also start scale up/scale down timers. @@ -1646,15 +1284,26 @@ private CompletableFuture restoreLogicalTopologyChangeEventAndStartTimers( Entry lastUpdateRevisionEntry = metaStorageManager.getLocally(zonesRecoverableStateRevision(), recoveryRevision); if (lastUpdateRevisionEntry.value() == null || topologyRevision > bytesToLongKeepingOrder(lastUpdateRevisionEntry.value())) { - return onLogicalTopologyUpdate(newLogicalTopology, recoveryRevision, catalogVersion); - } else { - return restoreTimers(catalogVersion); + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(recoveryRevision); + + return onLogicalTopologyUpdate(newLogicalTopology, timestamp); } + + dataNodesManager.start(catalogManager.zones(catalogVersion), newLogicalTopology); } return nullCompletedFuture(); } + private CompletableFuture onDropZoneBusy(DropZoneEventParameters parameters) { + long causalityToken = parameters.causalityToken(); + + HybridTimestamp timestamp = metaStorageManager.timestampByRevisionLocally(causalityToken); + + return removeDataNodesKeys(parameters.zoneId(), timestamp) + .thenRun(() -> dataNodesManager.onZoneDrop(parameters.zoneId(), timestamp)); + } + private class ManagerCatalogAlterZoneEventListener extends CatalogAlterZoneEventListener { private ManagerCatalogAlterZoneEventListener() { super(catalogManager); @@ -1676,19 +1325,20 @@ protected CompletableFuture onFilterUpdate(AlterZoneEventParameters parame } } - private CompletableFuture onDropZoneBusy(DropZoneEventParameters parameters) { - int zoneId = parameters.zoneId(); - - long causalityToken = parameters.causalityToken(); - - ZoneState zoneState = zonesState.get(zoneId); - - zoneState.stopTimers(); + private class DistributionZoneManagerLogicalTopologyEventListener implements LogicalTopologyEventListener { + @Override + public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) { + updateLogicalTopologyInMetaStorage(newTopology); + } - return removeTriggerKeysAndDataNodes(zoneId, causalityToken).thenRun(() -> { - causalityDataNodesEngine.onDelete(causalityToken, zoneId); + @Override + public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { + updateLogicalTopologyInMetaStorage(newTopology); + } - zonesState.remove(zoneId); - }); + @Override + public void onTopologyLeap(LogicalTopologySnapshot newTopology) { + updateLogicalTopologyInMetaStorage(newTopology); + } } } diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java index 095c899344b..5cae346c825 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.distributionzones; -import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; @@ -31,7 +30,6 @@ import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; -import static org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder; import static org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder; import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes; @@ -54,9 +52,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; -import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; import org.apache.ignite.internal.lang.ByteArray; -import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; @@ -74,7 +70,10 @@ public class DistributionZonesUtil { private static final String DISTRIBUTION_ZONE_PREFIX = "distributionZone."; /** Key prefix for zone's data nodes and trigger keys. */ - private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = "distributionZone.dataNodes."; + private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = DISTRIBUTION_ZONE_PREFIX + "dataNodes."; + + /** Key prefix for zone's data nodes history. */ + private static final String DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX = DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "history."; /** Key prefix for zone's data nodes. */ public static final String DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX = DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "value."; @@ -82,6 +81,16 @@ public class DistributionZonesUtil { public static final byte[] DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES = DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.getBytes(StandardCharsets.UTF_8); + /** Key prefix for zone's scale up timer. */ + public static final String DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX = DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleUpTimer."; + + /** Key prefix for zone's scale down timer. */ + public static final String DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX = DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleDownTimer."; + + /** Key prefix for zone's partition reset timer. */ + public static final String DISTRIBUTION_ZONE_PARTITION_RESET_TIMER_PREFIX = DISTRIBUTION_ZONE_DATA_NODES_PREFIX + + "partitionResetTimer."; + /** Key prefix for zone's scale up change trigger key. */ private static final String DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX = DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleUpChangeTrigger."; @@ -105,19 +114,12 @@ public class DistributionZonesUtil { /** Key prefix for zones' logical topology nodes. */ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes"; - /** Key prefix for zones' configurations in vault. */ - private static final String DISTRIBUTION_ZONES_VERSIONED_CONFIGURATION_VAULT = "vault." + DISTRIBUTION_ZONE_PREFIX - + "versionedConfiguration."; - /** Key prefix for zones' logical topology version. */ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION = DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "version"; /** Key prefix for zones' logical topology cluster ID. */ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_CLUSTER_ID = DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "clusterId"; - /** Key prefix that represents {@link ZoneState#topologyAugmentationMap()}.*/ - private static final String DISTRIBUTION_ZONES_TOPOLOGY_AUGMENTATION_PREFIX = "distributionZones.topologyAugmentation."; - /** ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */ private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY = new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY); @@ -140,14 +142,6 @@ public class DistributionZonesUtil { private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_CLUSTER_ID_KEY = new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_CLUSTER_ID); - /** - * The initial value of trigger revision in case when it is not initialized in the meta storage. - * It is possible because invoke to metastorage with the initialisation is async, and scale up/down propagation could be - * propagated first. Initial value is -1, because for default zone, we initialise trigger keys with metastorage's applied revision, - * which is 0 on a start. - */ - private static final long INITIAL_TRIGGER_REVISION_VALUE = -1; - /** ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */ private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY = new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX); @@ -183,22 +177,52 @@ public static ByteArray zoneDataNodesKey(int zoneId) { } /** - * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}. + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX}. * + * @param zoneId Zone id. * @return ByteArray representation. */ - public static ByteArray zoneDataNodesKey() { - return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES); + public static ByteArray zoneDataNodesHistoryKey(int zoneId) { + return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX + zoneId); + } + + /** + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX}. + * + * @param zoneId Zone id. + * @return ByteArray representation. + */ + public static ByteArray zoneScaleUpTimerKey(int zoneId) { + return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX + zoneId); + } + + /** + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX}. + * + * @param zoneId Zone id. + * @return ByteArray representation. + */ + public static ByteArray zoneScaleDownTimerKey(int zoneId) { + return new ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX + zoneId); } /** - * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONES_VERSIONED_CONFIGURATION_VAULT}. + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_PARTITION_RESET_TIMER_PREFIX}. * * @param zoneId Zone id. * @return ByteArray representation. */ - public static ByteArray zoneVersionedConfigurationKey(int zoneId) { - return new ByteArray(DISTRIBUTION_ZONES_VERSIONED_CONFIGURATION_VAULT + zoneId); + public static ByteArray zonePartitionResetTimerKey(int zoneId) { + return new ByteArray(DISTRIBUTION_ZONE_PARTITION_RESET_TIMER_PREFIX + zoneId); + } + + /** + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}. + * + * @return ByteArray representation. + */ + public static ByteArray zoneDataNodesKey() { + return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES); } /** @@ -281,24 +305,17 @@ public static ByteArray zonesLastHandledTopology() { return DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY_KEY; } - /** - * The key that represents {@link ZoneState#topologyAugmentationMap()} in the Meta Storage. - */ - static ByteArray zoneTopologyAugmentation(int zoneId) { - return new ByteArray(DISTRIBUTION_ZONES_TOPOLOGY_AUGMENTATION_PREFIX + zoneId); - } - /** * Condition for creating all data nodes' related keys in Meta Storage. Condition passes only when - * {@link DistributionZonesUtil#zoneDataNodesKey(int)} not exists and not a tombstone in the Meta Storage. + * {@link DistributionZonesUtil#zoneDataNodesHistoryKey(int)} not exists and not a tombstone in the Meta Storage. * - * @param zoneId Distribution zone id + * @param zoneId Distribution zone id. * @return Update condition. */ static CompoundCondition conditionForZoneCreation(int zoneId) { return and( - notExists(zoneDataNodesKey(zoneId)), - notTombstone(zoneDataNodesKey(zoneId)) + notExists(zoneDataNodesHistoryKey(zoneId)), + notTombstone(zoneDataNodesHistoryKey(zoneId)) ); } @@ -323,69 +340,9 @@ static CompoundCondition conditionForRecoverableStateChanges(long revision) { * @return Update condition. */ static SimpleCondition conditionForZoneRemoval(int zoneId) { - return exists(zoneDataNodesKey(zoneId)); - } - - /** - * Condition for updating {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key. - * Update only if the revision of the event is newer than value in that trigger key. - * - * @param scaleUpTriggerRevision Trigger revision of scale up. - * @param scaleDownTriggerRevision Trigger revision of scale down. - * @param zoneId Zone id. - * @return Update condition. - */ - static CompoundCondition triggerScaleUpScaleDownKeysCondition(long scaleUpTriggerRevision, long scaleDownTriggerRevision, int zoneId) { - SimpleCondition scaleUpCondition; - - if (scaleUpTriggerRevision != INITIAL_TRIGGER_REVISION_VALUE) { - scaleUpCondition = value(zoneScaleUpChangeTriggerKey(zoneId)).eq(longToBytesKeepingOrder(scaleUpTriggerRevision)); - } else { - scaleUpCondition = notExists(zoneScaleUpChangeTriggerKey(zoneId)); - } - - SimpleCondition scaleDownCondition; - - if (scaleDownTriggerRevision != INITIAL_TRIGGER_REVISION_VALUE) { - scaleDownCondition = value(zoneScaleDownChangeTriggerKey(zoneId)).eq(longToBytesKeepingOrder(scaleDownTriggerRevision)); - } else { - scaleDownCondition = notExists(zoneScaleDownChangeTriggerKey(zoneId)); - } - - return and(scaleUpCondition, scaleDownCondition); - } - - /** - * Updates data nodes value for a zone and set {@code revision} to {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}. - * - * @param zoneId Distribution zone id - * @param revision Revision of the event. - * @param nodes Data nodes. - * @return Update command for the meta storage. - */ - static Update updateDataNodesAndScaleUpTriggerKey(int zoneId, long revision, byte[] nodes) { - return ops( - put(zoneDataNodesKey(zoneId), nodes), - put(zoneScaleUpChangeTriggerKey(zoneId), longToBytesKeepingOrder(revision)) - ).yield(true); - } - - /** - * Updates data nodes value for a zone and set {@code revision} to {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)}. - * - * @param zoneId Distribution zone id - * @param revision Revision of the event. - * @param nodes Data nodes. - * @return Update command for the meta storage. - */ - static Update updateDataNodesAndScaleDownTriggerKey(int zoneId, long revision, byte[] nodes) { - return ops( - put(zoneDataNodesKey(zoneId), nodes), - put(zoneScaleDownChangeTriggerKey(zoneId), longToBytesKeepingOrder(revision)) - ).yield(true); + return exists(zoneDataNodesHistoryKey(zoneId)); } - /** * Updates data nodes value for a zone and set {@code revision} to {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} and * {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)}. @@ -506,34 +463,6 @@ public static Map deserializeNodesAttributes(byte[] by return NodesAttributesSerializer.deserialize(bytes); } - /** - * Returns data nodes from the meta storage entry or empty map if the value is null. - * - * @param dataNodesEntry Meta storage entry with data nodes. - * @return Data nodes. - */ - static Map extractDataNodes(Entry dataNodesEntry) { - if (!dataNodesEntry.empty()) { - return deserializeDataNodesMap(dataNodesEntry.value()); - } else { - return emptyMap(); - } - } - - /** - * Returns a trigger revision from the meta storage entry or {@link INITIAL_TRIGGER_REVISION_VALUE} if the value is null. - * - * @param revisionEntry Meta storage entry with data nodes. - * @return Revision. - */ - static long extractChangeTriggerRevision(Entry revisionEntry) { - if (!revisionEntry.empty()) { - return bytesToLongKeepingOrder(revisionEntry.value()); - } else { - return INITIAL_TRIGGER_REVISION_VALUE; - } - } - /** * Check if {@code nodeAttributes} satisfy the {@code filter}. * @@ -608,6 +537,16 @@ public static boolean filterStorageProfiles( return new HashSet<>(node.storageProfiles()).containsAll(zoneStorageProfilesNames); } + public static Set filterDataNodes( + Set dataNodes, + CatalogZoneDescriptor zoneDescriptor + ) { + return dataNodes.stream() + .filter(n -> filterNodeAttributes(n.userAttributes(), zoneDescriptor.filter())) + .filter(n -> filterStorageProfiles(n, zoneDescriptor.storageProfiles().profiles())) + .collect(toSet()); + } + /** * Filters {@code dataNodes} according to the provided filter and storage profiles from {@code zoneDescriptor}. * Nodes' attributes and storage profiles are taken from {@code nodesAttributes} map. diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java deleted file mode 100644 index 276ca7b7235..00000000000 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java +++ /dev/null @@ -1,540 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.distributionzones.causalitydatanodes; - -import static java.lang.Math.max; -import static java.util.Collections.emptySet; -import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey; -import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey; -import static org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import org.apache.ignite.internal.catalog.CatalogManager; -import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; -import org.apache.ignite.internal.catalog.events.CatalogEventParameters; -import org.apache.ignite.internal.causality.IncrementalVersionedValue; -import org.apache.ignite.internal.causality.OutdatedTokenException; -import org.apache.ignite.internal.causality.RevisionListenerRegistry; -import org.apache.ignite.internal.causality.VersionedValue; -import org.apache.ignite.internal.distributionzones.DataNodesMapSerializer; -import org.apache.ignite.internal.distributionzones.DistributionZoneManager; -import org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation; -import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; -import org.apache.ignite.internal.distributionzones.DistributionZonesUtil; -import org.apache.ignite.internal.distributionzones.Node; -import org.apache.ignite.internal.distributionzones.NodeWithAttributes; -import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; -import org.apache.ignite.internal.lang.ByteArray; -import org.apache.ignite.internal.metastorage.Entry; -import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.util.IgniteSpinBusyLock; - -/** - * Causality data nodes engine. Contains logic for obtaining zone's data nodes with causality token. - */ -public class CausalityDataNodesEngine { - /** Busy lock to stop synchronously. */ - private final IgniteSpinBusyLock busyLock; - - /** Meta Storage manager. */ - private final MetaStorageManager msManager; - - /** Distribution zones manager. */ - private final DistributionZoneManager distributionZoneManager; - - /** Catalog manager. */ - private final CatalogManager catalogManager; - - /** - * Map with states for distribution zones. States are needed to track nodes that we want to add or remove from the data nodes, - * schedule and stop scale up and scale down processes. - */ - private final Map zonesState; - - /** - * The map which contains zones' create revision. It is updated only if the current node handled zone's create event. - * TODO IGNITE-20050 Clean up this map. - */ - private final ConcurrentHashMap zonesCreateRevision = new ConcurrentHashMap<>(); - - /** Used to guarantee that the synchronous parts of all metastorage listeners for a specified causality token are handled. */ - private final VersionedValue zonesVv; - - /** - * The constructor. - * - * @param busyLock Busy lock to stop synchronously. - * @param registry Registry for versioned values. - * @param msManager Meta Storage manager. - * @param zonesState Map with states for distribution zones. - * @param distributionZoneManager Distribution zones manager. - * @param catalogManager Catalog manager. - */ - public CausalityDataNodesEngine( - IgniteSpinBusyLock busyLock, - RevisionListenerRegistry registry, - MetaStorageManager msManager, - Map zonesState, - DistributionZoneManager distributionZoneManager, - CatalogManager catalogManager - ) { - this.busyLock = busyLock; - this.msManager = msManager; - this.zonesState = zonesState; - this.distributionZoneManager = distributionZoneManager; - this.catalogManager = catalogManager; - - zonesVv = new IncrementalVersionedValue<>(registry); - } - - /** - * Gets data nodes of the zone using causality token and catalog version. {@code causalityToken} must be agreed - * with the {@code catalogVersion}, meaning that for the provided {@code causalityToken} actual {@code catalogVersion} must be provided. - * For example, if you are in the meta storage watch thread and {@code causalityToken} is the revision of the watch event, it is - * safe to take {@link CatalogManager#latestCatalogVersion()} as a {@code catalogVersion}, - * because {@link CatalogManager#latestCatalogVersion()} won't be updated in a watch thread. - * The same is applied for {@link CatalogEventParameters}, it is safe to take {@link CatalogEventParameters#causalityToken()} - * as a {@code causalityToken} and {@link CatalogEventParameters#catalogVersion()} as a {@code catalogVersion}. - * - *

Return data nodes or throw the exception: - * {@link IllegalArgumentException} if causalityToken or zoneId is not valid. - * {@link DistributionZoneNotFoundException} if the zone with the provided zoneId does not exist. - * - * @param causalityToken Causality token. - * @param catalogVersion Catalog version. - * @param zoneId Zone id. - * @return The future with data nodes for the zoneId. - */ - public CompletableFuture> dataNodes(long causalityToken, int catalogVersion, int zoneId) { - if (causalityToken < 1) { - throw new IllegalArgumentException("causalityToken must be greater then zero [causalityToken=" + causalityToken + '"'); - } - - if (catalogVersion < 0) { - throw new IllegalArgumentException("catalogVersion must be greater or equal to zero [catalogVersion=" + catalogVersion + '"'); - } - - if (zoneId < 0) { - throw new IllegalArgumentException("zoneId cannot be a negative number [zoneId=" + zoneId + '"'); - } - - return inBusyLock(busyLock, () -> { - try { - return zonesVv.get(causalityToken); - } catch (OutdatedTokenException e) { - // This exception means that the DistributionZoneManager has already processed event with the causalityToken. - return nullCompletedFuture(); - } - }).thenApply(ignored -> inBusyLock(busyLock, () -> { - CatalogZoneDescriptor zoneDescriptor = catalogManager.catalog(catalogVersion).zone(zoneId); - - if (zoneDescriptor == null) { - // It means that the zone does not exist on a given causality token or causality token is lower than metastorage recovery - // revision. - throw new DistributionZoneNotFoundException(zoneId); - } - - Long createRevision = zonesCreateRevision.get(zoneId); - - long descLastUpdateRevision = zoneDescriptor.updateToken(); - - // Get revisions of the last scale up and scale down event which triggered immediate data nodes recalculation. - long lastScaleUpRevision = getRevisionsOfLastScaleUpEvent(causalityToken, catalogVersion, zoneId); - long lastScaleDownRevision = getRevisionsOfLastScaleDownEvent(causalityToken, catalogVersion, zoneId); - - Entry dataNodes = msManager.getLocally(zoneDataNodesKey(zoneId), causalityToken); - - if (createRevision != null && createRevision.equals(descLastUpdateRevision) - && descLastUpdateRevision >= lastScaleUpRevision - && descLastUpdateRevision >= lastScaleDownRevision - && dataNodes.empty() - ) { - // It means that the zone was created but the data nodes value had not been updated yet. - // So the data nodes value will be equals to the logical topology on the descLastUpdateRevision. - Entry topologyEntry = msManager.getLocally(zonesLogicalTopologyKey(), createRevision); - - if (topologyEntry.empty()) { - // A special case for a very first start of a node, when a creation of a zone was before the first topology event, - // meaning that zonesLogicalTopologyKey() has not been updated yet, safe to return empty set here. - return emptySet(); - } - - Set logicalTopology = deserializeLogicalTopologySet(topologyEntry.value()); - - Set logicalTopologyNodes = logicalTopology.stream().map(n -> n.node()).collect(toSet()); - - return filterDataNodes(logicalTopologyNodes, zoneDescriptor, distributionZoneManager.nodesAttributes()); - } - - ZoneState zoneState = zonesState.get(zoneId); - - ConcurrentSkipListMap subAugmentationMap = null; - - // On the data nodes recalculation we write new data nodes to the meta storage then clear the augmentation map. - // Therefore, first we need to read the augmentation map before it is cleared and then read the last data nodes value - // from the meta storage. - // The zoneState can be null if the zone was removed. - if (zoneState != null) { - subAugmentationMap = new ConcurrentSkipListMap<>(zoneState.topologyAugmentationMap() - .headMap(causalityToken, true)); - } - - // Search the revisions of zoneScaleUpChangeTriggerKey and zoneScaleDownChangeTriggerKey with value greater or equals - // to expected one. - long scaleUpDataNodesRevision = searchTriggerKey(lastScaleUpRevision, zoneId, zoneScaleUpChangeTriggerKey(zoneId)); - long scaleDownDataNodesRevision = searchTriggerKey(lastScaleDownRevision, zoneId, zoneScaleDownChangeTriggerKey(zoneId)); - - // Choose the highest revision. - long dataNodesRevision = max(causalityToken, max(scaleUpDataNodesRevision, scaleDownDataNodesRevision)); - - // Read data nodes value from the meta storage on dataNodesRevision and associated trigger keys. - Entry dataNodesEntry = msManager.getLocally(zoneDataNodesKey(zoneId), dataNodesRevision); - Entry scaleUpChangeTriggerKey = msManager.getLocally(zoneScaleUpChangeTriggerKey(zoneId), dataNodesRevision); - Entry scaleDownChangeTriggerKey = msManager.getLocally(zoneScaleDownChangeTriggerKey(zoneId), dataNodesRevision); - - if (dataNodesEntry.value() == null) { - // The zone was removed. - // In this case it is impossible to find out the data nodes value idempotently. - return emptySet(); - } - - Set baseDataNodes = DistributionZonesUtil.dataNodes( - DataNodesMapSerializer.deserialize(dataNodesEntry.value()) - ); - long scaleUpTriggerRevision = bytesToLongKeepingOrder(scaleUpChangeTriggerKey.value()); - long scaleDownTriggerRevision = bytesToLongKeepingOrder(scaleDownChangeTriggerKey.value()); - - Set finalDataNodes = new HashSet<>(baseDataNodes); - - // If the subAugmentationMap is null then it means that the zone was removed. - // In this case all nodes from topologyAugmentationMap must be already written to the meta storage. - if (subAugmentationMap != null) { - // Update the data nodes set with pending data from augmentation map - subAugmentationMap.forEach((rev, augmentation) -> { - if (augmentation.addition() && rev > scaleUpTriggerRevision && rev <= lastScaleUpRevision) { - finalDataNodes.addAll(augmentation.nodes()); - } - - if (!augmentation.addition() && rev > scaleDownTriggerRevision && rev <= lastScaleDownRevision) { - finalDataNodes.removeAll(augmentation.nodes()); - } - }); - } - - // Apply the filter to get the final data nodes set. - return filterDataNodes(finalDataNodes, zoneDescriptor, distributionZoneManager.nodesAttributes()); - })); - } - - /** - * Return revision of the last event which triggers immediate scale up recalculation of the data nodes value. - */ - private long getRevisionsOfLastScaleUpEvent( - long causalityToken, - int catalogVersion, - int zoneId - ) { - return max( - getLastScaleUpConfigRevision(catalogVersion, zoneId), - getLastScaleUpTopologyRevisions(causalityToken, catalogVersion, zoneId) - ); - } - - /** - * Return revision of the last event which triggers immediate scale down recalculation of the data nodes value. - */ - private long getRevisionsOfLastScaleDownEvent( - long causalityToken, - int catalogVersion, - int zoneId - ) { - return max( - getLastScaleDownConfigRevision(catalogVersion, zoneId), - getLastScaleDownTopologyRevisions(causalityToken, catalogVersion, zoneId) - ); - } - - /** - * Return revision of the last configuration change event which triggers immediate scale up recalculation of the data nodes value. - */ - private long getLastScaleUpConfigRevision( - int catalogVersion, - int zoneId - ) { - return getLastConfigRevision(catalogVersion, zoneId, true); - } - - /** - * Return revision of the last configuration change event which triggers immediate scale down recalculation of the data nodes value. - */ - private long getLastScaleDownConfigRevision( - int catalogVersion, - int zoneId - ) { - return getLastConfigRevision(catalogVersion, zoneId, false); - } - - /** - * Get revision of the latest configuration change event which trigger immediate scale up or scale down recalculation - * of the data nodes value. - */ - private long getLastConfigRevision( - int catalogVersion, - int zoneId, - boolean isScaleUp - ) { - CatalogZoneDescriptor entryNewerCfg = null; - - // Iterate over zone configurations from newest to oldest. - for (int i = catalogVersion; i >= catalogManager.earliestCatalogVersion(); i--) { - CatalogZoneDescriptor entryOlderCfg = catalogManager.zone(zoneId, i); - - if (entryOlderCfg == null) { - break; - } - - if (entryNewerCfg != null) { - if (isScaleUp) { - if (isScaleUpConfigRevision(entryOlderCfg, entryNewerCfg)) { - return entryNewerCfg.updateToken(); - } - } else { - if (isScaleDownConfigRevision(entryOlderCfg, entryNewerCfg)) { - return entryNewerCfg.updateToken(); - } - } - } - - entryNewerCfg = entryOlderCfg; - } - - assert entryNewerCfg != null : "At least one zone configuration must be present ."; - - // The case when there is only one configuration in the history. This configuration corresponds to the zone creation. - return entryNewerCfg.updateToken(); - } - - /** - * Check if newer configuration triggers immediate scale up recalculation of the data nodes value. - * Return true if an older configuration has not immediate scale up and an newer configuration has immediate scale up - * or older and newer configuration have different filter. - */ - private static boolean isScaleUpConfigRevision(CatalogZoneDescriptor olderCfg, CatalogZoneDescriptor newerCfg) { - return olderCfg.dataNodesAutoAdjustScaleUp() != newerCfg.dataNodesAutoAdjustScaleUp() - && newerCfg.dataNodesAutoAdjustScaleUp() == IMMEDIATE_TIMER_VALUE - || !olderCfg.filter().equals(newerCfg.filter()); - } - - /** - * Check if newer configuration triggers immediate scale down recalculation of the data nodes value. - * Return true if an older configuration has not immediate scale down and an newer configuration has immediate scale down. - */ - private static boolean isScaleDownConfigRevision(CatalogZoneDescriptor olderCfg, CatalogZoneDescriptor newerCfg) { - return olderCfg.dataNodesAutoAdjustScaleDown() != newerCfg.dataNodesAutoAdjustScaleDown() - && newerCfg.dataNodesAutoAdjustScaleDown() == IMMEDIATE_TIMER_VALUE; - } - - /** - * Return revision of the last topology change event which triggers immediate scale up recalculation of the data nodes value. - */ - private long getLastScaleUpTopologyRevisions( - long causalityToken, - int catalogVersion, - int zoneId - ) { - return getLastTopologyRevisions(causalityToken, zoneId, catalogVersion, true); - } - - /** - * Return revision of the last topology change event which triggers immediate scale down recalculation of the data nodes value. - */ - private long getLastScaleDownTopologyRevisions( - long causalityToken, - int catalogVersion, - int zoneId - ) { - return getLastTopologyRevisions(causalityToken, zoneId, catalogVersion, false); - } - - /** - * Get revisions of the latest topology event with added nodes and with removed nodes when the zone have - * immediate scale up and scale down timers. - * - * @param causalityToken causalityToken. - * @param zoneId zoneId. - * @return Revisions. - */ - private long getLastTopologyRevisions( - long causalityToken, - int zoneId, - int catalogVersion, - boolean isScaleUp - ) { - Set newerLogicalTopology; - - long newerTopologyRevision; - - Entry topologyEntry = msManager.getLocally(zonesLogicalTopologyKey(), causalityToken); - - if (!topologyEntry.empty()) { - byte[] newerLogicalTopologyBytes = topologyEntry.value(); - - newerLogicalTopology = deserializeLogicalTopologySet(newerLogicalTopologyBytes); - - newerTopologyRevision = topologyEntry.revision(); - - while (true) { - topologyEntry = msManager.getLocally(zonesLogicalTopologyKey(), newerTopologyRevision - 1); - - Set olderLogicalTopology; - - if (topologyEntry.empty()) { - // If older topology is empty then it means that each topology changes were iterated - // so use empty set to compare it with the first topology. - olderLogicalTopology = emptySet(); - } else { - byte[] olderLogicalTopologyBytes = topologyEntry.value(); - - olderLogicalTopology = deserializeLogicalTopologySet(olderLogicalTopologyBytes); - } - - CatalogZoneDescriptor zoneDescriptor = catalogManager.catalog(catalogVersion).zone(zoneId); - - if (zoneDescriptor == null) { - break; - } - - if (isScaleUp) { - if (isScaleUpTopologyRevision(olderLogicalTopology, newerLogicalTopology, zoneDescriptor)) { - return newerTopologyRevision; - } - } else { - if (isScaleDownTopologyRevision(olderLogicalTopology, newerLogicalTopology, zoneDescriptor)) { - return newerTopologyRevision; - } - } - - newerLogicalTopology = olderLogicalTopology; - - newerTopologyRevision = topologyEntry.revision(); - - if (topologyEntry.empty()) { - break; - } - } - } - - return 0; - } - - /** - * Check if newer topology triggers immediate scale up recalculation of the data nodes value. - * Return true if newer logical topology has added nodes and a zone configuration has immediate scale up. - */ - private static boolean isScaleUpTopologyRevision( - Set olderLogicalTopology, - Set newerLogicalTopology, - CatalogZoneDescriptor zoneDescriptor - ) { - return newerLogicalTopology.stream().anyMatch(node -> !olderLogicalTopology.contains(node)) - && zoneDescriptor.dataNodesAutoAdjustScaleUp() == IMMEDIATE_TIMER_VALUE; - } - - /** - * Check if newer topology triggers immediate scale down recalculation of the data nodes value. - * Return true if newer logical topology has removed nodes and a zone configuration has immediate scale down. - */ - private static boolean isScaleDownTopologyRevision( - Set olderLogicalTopology, - Set newerLogicalTopology, - CatalogZoneDescriptor zoneDescriptor - ) { - return olderLogicalTopology.stream().anyMatch(node -> !newerLogicalTopology.contains(node)) - && zoneDescriptor.dataNodesAutoAdjustScaleDown() == IMMEDIATE_TIMER_VALUE; - } - - /** - * Search a value of zoneScaleUpChangeTriggerKey/zoneScaleDownChangeTriggerKey which equals or greater than scaleRevision. - * It iterates over the entries in the local meta storage. If there is an entry with a value equals to or greater than - * the scaleRevision, then it returns the revision of this entry. If there is no such entry then it returns zero. - * - * @param scaleRevision Scale revision. - * @param zoneId Zone id. - * @param triggerKey Trigger key. - * @return Revision. - */ - private long searchTriggerKey(Long scaleRevision, int zoneId, ByteArray triggerKey) { - Entry lastEntry = msManager.getLocally(triggerKey, Long.MAX_VALUE); - - long upperRevision = max(lastEntry.revision(), scaleRevision); - - // Gets old entries from storage to check if the expected value was handled before watch listener was registered. - List entryList = msManager.getLocally(triggerKey.bytes(), scaleRevision, upperRevision); - - for (Entry entry : entryList) { - // scaleRevision is null if the zone was removed. - if (entry.value() == null) { - return entry.revision(); - } else { - long entryScaleRevision = bytesToLongKeepingOrder(entry.value()); - - if (entryScaleRevision >= scaleRevision) { - return entry.revision(); - } - } - } - - return 0; - } - - /** - * Saves revision of the creation a zone to a local state. - * - * @param revision Revision. - * @param zone Zone descriptor. - */ - public void onCreateZoneState(long revision, CatalogZoneDescriptor zone) { - int zoneId = zone.id(); - - zonesCreateRevision.put(zoneId, revision); - } - - /** - * Update zone configuration on a zone dropping. - * - * @param revision Revision. - * @param zoneId Zone id. - */ - public void onDelete(long revision, int zoneId) { - // https://issues.apache.org/jira/browse/IGNITE-20050 - } -} diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java index 26909c6b2ac..4cecca90071 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java @@ -22,7 +22,6 @@ import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; import static org.apache.ignite.internal.util.IgniteUtils.startAsync; @@ -34,8 +33,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogTestUtils; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; @@ -58,7 +55,6 @@ import org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.apache.ignite.internal.thread.NamedThreadFactory; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -119,16 +115,12 @@ void setUp() throws Exception { catalogManager = createTestCatalogManager(nodeName, clock, metaStorageManager); components.add(catalogManager); - ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - NamedThreadFactory.create(nodeName, "test-rebalance-scheduler", logger())); - distributionZoneManager = new DistributionZoneManager( nodeName, revisionUpdater, metaStorageManager, new LogicalTopologyServiceImpl(topology, cmgManager), catalogManager, - rebalanceScheduler, systemDistributedConfiguration ); diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 03ff94ce261..5754e692011 100644 --- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -167,6 +167,14 @@ public interface MetaStorageManager extends IgniteComponent { @Deprecated List getLocally(byte[] key, long revLowerBound, long revUpperBound); + /** + * Returns an entry for the given key locally. See also {@link #getLocally(ByteArray, long)}. + * + * @param key Key. + * @return Entry. + */ + Entry getLocally(ByteArray key); + /** * Returns an entry for the given key and the revision upper bound locally. * diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 5a137cd67f6..95ea4f38b96 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -862,6 +862,11 @@ public List getLocally(byte[] key, long revLowerBound, long revUpperBound return inBusyLock(busyLock, () -> storage.get(key, revLowerBound, revUpperBound)); } + @Override + public Entry getLocally(ByteArray key) { + return inBusyLock(busyLock, () -> storage.get(key.bytes())); + } + @Override public Entry getLocally(ByteArray key, long revUpperBound) { return inBusyLock(busyLock, () -> storage.get(key.bytes(), revUpperBound)); diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index 972dd217c59..b87fc0cef57 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -1289,7 +1289,6 @@ public CompletableFuture invoke( metaStorageManager, logicalTopologyService, catalogManager, - rebalanceScheduler, systemDistributedConfiguration ); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index fee0635e903..1abb7444540 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -694,7 +694,6 @@ public CompletableFuture invoke(Condition condition, List su metaStorageMgr, logicalTopologyService, catalogManager, - rebalanceScheduler, systemDistributedConfiguration ) { @Override diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 0e4e5f6e5ad..7d1a8ae2bce 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -916,7 +916,6 @@ public class IgniteImpl implements Ignite { metaStorageMgr, logicalTopologyService, catalogManager, - rebalanceScheduler, systemDistributedConfiguration );