Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23466 #5092

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public List<UpdateEntry> get(Catalog catalog) {

CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone);

return List.of(new AlterZoneEntry(descriptor));
return List.of(new AlterZoneEntry(descriptor, zone));
}

private CatalogZoneDescriptor fromParamsAndPreviousValue(CatalogZoneDescriptor previous) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public List<UpdateEntry> get(Catalog catalog) {
zone.consistencyMode()
);

return List.of(new AlterZoneEntry(descriptor));
return List.of(new AlterZoneEntry(descriptor, zone));
}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,26 @@ public class AlterZoneEventParameters extends CatalogEventParameters {

private final CatalogZoneDescriptor zoneDescriptor;

private final CatalogZoneDescriptor previousDescriptor;

/**
* Constructor.
*
* @param causalityToken Causality token.
* @param catalogVersion Catalog version.
* @param zoneDescriptor Newly created distribution zone descriptor.
* @param previousDescriptor Previous distribution zone descriptor.
*/
public AlterZoneEventParameters(long causalityToken, int catalogVersion, CatalogZoneDescriptor zoneDescriptor) {
public AlterZoneEventParameters(
long causalityToken,
int catalogVersion,
CatalogZoneDescriptor zoneDescriptor,
CatalogZoneDescriptor previousDescriptor
) {
super(causalityToken, catalogVersion);

this.zoneDescriptor = zoneDescriptor;
this.previousDescriptor = previousDescriptor;
}

/**
Expand All @@ -45,4 +54,11 @@ public AlterZoneEventParameters(long causalityToken, int catalogVersion, Catalog
public CatalogZoneDescriptor zoneDescriptor() {
return zoneDescriptor;
}

/**
* Gets previous distribution zone descriptor.
*/
public CatalogZoneDescriptor previousDescriptor() {
return previousDescriptor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,29 @@ public class AlterZoneEntry implements UpdateEntry, Fireable {

private final CatalogZoneDescriptor descriptor;

private final CatalogZoneDescriptor previousDescriptor;

/**
* Constructs the object.
*
* @param descriptor A descriptor of a zone to alter.
* @param previousDescriptor A previous descriptor of a zone.
*/
public AlterZoneEntry(CatalogZoneDescriptor descriptor) {
public AlterZoneEntry(CatalogZoneDescriptor descriptor, CatalogZoneDescriptor previousDescriptor) {
this.descriptor = descriptor;
this.previousDescriptor = previousDescriptor;
}

/** Returns descriptor of a zone to alter. */
public CatalogZoneDescriptor descriptor() {
return descriptor;
}

/** Returns previous descriptor of a zone. */
public CatalogZoneDescriptor previousDescriptor() {
return previousDescriptor;
}

@Override
public int typeId() {
return MarshallableEntryType.ALTER_ZONE.id();
Expand All @@ -66,7 +75,7 @@ public CatalogEvent eventType() {

@Override
public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) {
return new AlterZoneEventParameters(causalityToken, catalogVersion, descriptor);
return new AlterZoneEventParameters(causalityToken, catalogVersion, descriptor, previousDescriptor);
}

@Override
Expand Down Expand Up @@ -97,13 +106,15 @@ private static class AlterZoneEntrySerializer implements CatalogObjectSerializer
@Override
public AlterZoneEntry readFrom(IgniteDataInput input) throws IOException {
CatalogZoneDescriptor descriptor = CatalogZoneDescriptor.SERIALIZER.readFrom(input);
CatalogZoneDescriptor previousDescriptor = CatalogZoneDescriptor.SERIALIZER.readFrom(input);

return new AlterZoneEntry(descriptor);
return new AlterZoneEntry(descriptor, previousDescriptor);
}

@Override
public void writeTo(AlterZoneEntry object, IgniteDataOutput output) throws IOException {
CatalogZoneDescriptor.SERIALIZER.writeTo(object.descriptor(), output);
CatalogZoneDescriptor.SERIALIZER.writeTo(object.previousDescriptor(), output);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testFunctionCallDefault() throws IOException {
private void checkAlterZoneEntry() {
CatalogStorageProfilesDescriptor profiles =
new CatalogStorageProfilesDescriptor(List.of(new CatalogStorageProfileDescriptor("default")));
UpdateEntry entry1 = new AlterZoneEntry(newCatalogZoneDescriptor("zone1", profiles));
UpdateEntry entry1 = new AlterZoneEntry(newCatalogZoneDescriptor("zone1", profiles), newCatalogZoneDescriptor("zone0", profiles));

VersionedUpdate update = newVersionedUpdate(entry1, entry1);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.lang;

import java.util.Objects;

/**
* Pair of objects.
*
* @param <T> First object.
* @param <V> Second object.
*/
public class Pair<T, V> {
/** First obj. */
private final T first;

/** Second obj. */
private final V second;

/**
* Constructor.
*
* @param first First object.
* @param second Second object.
*/
public Pair(T first, V second) {
this.first = first;
this.second = second;
}

public static <T, V> Pair<T, V> pair(T first, V second) {
return new Pair<>(first, second);
}

/**
* Get the first object.
*/
public T getFirst() {
return first;
}

/**
* Get the second object.
*/
public V getSecond() {
return second;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Pair<?, ?> pair = (Pair<?, ?>) o;
return Objects.equals(first, pair.first) && Objects.equals(second, pair.second);
}

@Override
public int hashCode() {
return Objects.hash(first, second);
}

@Override
public String toString() {
return "Pair [" + first + ", " + second + ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import java.time.LocalTime;
import java.time.Period;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Extended data input.
Expand Down Expand Up @@ -418,4 +422,58 @@ interface Materializer<T> {
*/
T materialize(byte[] buffer, int offset, int length);
}

/**
* Reads a collection.
*
* @param collectionSupplier Supplier to create a collection.
* @param elementReader Function to read an element.
* @return Collection.
*/
default <T, C extends Collection<T>> C readCollection(
Supplier<C> collectionSupplier,
ObjectReader<T> elementReader
) throws IOException {
int size = readVarIntAsInt();

C collection = collectionSupplier.get();

for (int i = 0; i < size; i++) {
collection.add(elementReader.read(this));
}

return collection;
}

/**
* Reads a map.
*
* @param mapSupplier Supplier to create a map.
* @param keyReader Function to read a key.
* @param valueReader Function to read a value.
* @return Map.
*/
default <K, V, VV extends V, M extends Map<K, VV>> M readMap(
Supplier<M> mapSupplier,
ObjectReader<K> keyReader,
ObjectReader<VV> valueReader
) throws IOException {
int size = readVarIntAsInt();

M map = mapSupplier.get();

for (int i = 0; i < size; i++) {
K key = keyReader.read(this);
VV value = valueReader.read(this);

map.put(key, value);
}

return map;
}

@FunctionalInterface
interface ObjectReader<T> {
T read(IgniteDataInput in) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import java.time.LocalTime;
import java.time.Period;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiConsumer;

/**
* Extended data output.
Expand Down Expand Up @@ -234,4 +237,40 @@ public interface IgniteDataOutput extends DataOutput {
* @throws IOException if something went wrong
*/
void flush() throws IOException;

/**
* Writes a collection.
*
* @param collection Collection.
* @param elementWriter Element writer.
*/
default <T> void writeCollection(Collection<T> collection, ObjectWriter<T> elementWriter) throws IOException {
writeVarInt(collection.size());

for (T e : collection) {
elementWriter.write(e, this);
}
}

/**
* Writes a map.
*
* @param map Map.
* @param keyWriter Key writer.
* @param valWriter Value writer.
*/
default <K, V> void writeMap(Map<K, V> map, ObjectWriter<K> keyWriter, ObjectWriter<V> valWriter)
throws IOException {
writeVarInt(map.size());

for (Map.Entry<K, V> e : map.entrySet()) {
keyWriter.write(e.getKey(), this);
valWriter.write(e.getValue(), this);
}
}

@FunctionalInterface
interface ObjectWriter<T> {
void write(T obj, IgniteDataOutput out) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKeyPrefix;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesRecoverableStateRevision;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultChannelTypeRegistry;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
Expand Down Expand Up @@ -74,7 +73,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Stream;
import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.BaseIgniteRestartTest;
Expand Down Expand Up @@ -127,7 +125,6 @@
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.worker.fixtures.NoOpCriticalWorkerRegistry;
import org.apache.ignite.network.NetworkAddress;
Expand Down Expand Up @@ -290,16 +287,12 @@ private PartialNode startPartialNode(int idx) {

var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), new TestClockService(clock, clockWaiter));

ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "test-rebalance-scheduler", logger()));

DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
revisionUpdater,
metastore,
logicalTopologyService,
catalogManager,
rebalanceScheduler,
clusterCfgMgr.configurationRegistry().getConfiguration(SystemDistributedExtensionConfiguration.KEY).system()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,6 @@ private class Node {
metaStorageManager,
logicalTopologyService,
catalogManager,
rebalanceScheduler,
systemDistributedConfiguration
);

Expand Down
Loading