Skip to content

Commit

Permalink
add ResetConsumerGroupOffsetTest based on #58 (#68)
Browse files Browse the repository at this point in the history
* add ResetConsumerGroupOffsetTest

* minor fixes on identation of build.gradle

* enable some group-related tests in ResetConsumerGroupOffsetTest

* eliminate unnecessary differences from upstream

* ci: ignore Java code format check

* fix: check non-exist topics in `waitForAllPartitionsMetadata`

---------

Co-authored-by: YangKian <[email protected]>
  • Loading branch information
Commelina and YangKian authored May 14, 2024
1 parent a2024f1 commit 80e7f96
Show file tree
Hide file tree
Showing 10 changed files with 2,260 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ jobs:

- name: check Java code format
run: |
./gradlew spotlessJavaCheck
# TODO: Upstream kafka code has a different format, ignore this check.
# ./gradlew spotlessJavaCheck
./gradlew spotlessGroovyGradleCheck
pre-tests:
Expand Down
11 changes: 6 additions & 5 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ dependencies {
implementation "org.apache.kafka:kafka-clients:${KAFKA_CLIENT_VERSION}"
implementation "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson"
implementation "com.fasterxml.jackson.core:jackson-databind:$versions.jacksonDatabind"
// XXX: May be used in the future
// implementation "com.fasterxml.jackson.module:jackson-module-scala_$versions.baseScala:$versions.jackson"
// implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson"
// implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson"
// implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:$versions.jackson"
implementation "com.fasterxml.jackson.module:jackson-module-scala_$versions.baseScala:$versions.jackson"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson"
implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson"
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:$versions.jackson"

implementation group: 'net.sf.jopt-simple', name: 'jopt-simple', version: '5.0.4'

testImplementation "org.scala-lang:scala-library:$versions.scala"
testImplementation "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;

import joptsimple.AbstractOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;

public abstract class CommandDefaultOptions {
public final String[] args;
public final OptionParser parser;
public final AbstractOptionSpec<Void> helpOpt;
public final AbstractOptionSpec<Void> versionOpt;
public OptionSet options;

public CommandDefaultOptions(String[] args) {
this(args, false);
}

public CommandDefaultOptions(String[] args, boolean allowCommandOptionAbbreviation) {
this.args = args;
this.parser = new OptionParser(allowCommandOptionAbbreviation);
this.helpOpt = parser.accepts("help", "Print usage information.").forHelp();
this.versionOpt = parser.accepts("version", "Display Kafka version.").forHelp();
this.options = null;
}
}
197 changes: 197 additions & 0 deletions app/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;

import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

/**
* Helper functions for dealing with command line utilities.
*/
public class CommandLineUtils {
/**
* Check if there are no options or `--help` option from command line.
*
* @param commandOpts Acceptable options for a command
* @return true on matching the help check condition
*/
public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) {
return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt);
}

/**
* Check if there is `--version` option from command line.
*
* @param commandOpts Acceptable options for a command
* @return true on matching the help check condition
*/
public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) {
return commandOpts.options.has(commandOpts.versionOpt);
}

/**
* Check and print help message if there is no options or `--help` option
* from command line, if `--version` is specified on the command line
* print version information and exit.
*
* @param commandOpts Acceptable options for a command
* @param message Message to display on successful check
*/
public static void maybePrintHelpOrVersion(CommandDefaultOptions commandOpts, String message) {
if (isPrintHelpNeeded(commandOpts)) {
printUsageAndExit(commandOpts.parser, message);
}
if (isPrintVersionNeeded(commandOpts)) {
printVersionAndExit();
}
}

/**
* Check that all the listed options are present.
*/
public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec<?>... requiredList) {
for (OptionSpec<?> arg : requiredList) {
if (!options.has(arg)) {
printUsageAndExit(parser, String.format("Missing required argument \"%s\"", arg));
}
}
}

/**
* Check that none of the listed options are present.
*/
public static void checkInvalidArgs(OptionParser parser,
OptionSet options,
OptionSpec<?> usedOption,
OptionSpec<?>... invalidOptions) {
if (options.has(usedOption)) {
for (OptionSpec<?> arg : invalidOptions) {
if (options.has(arg)) {
printUsageAndExit(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg));
}
}
}
}

/**
* Check that none of the listed options are present.
*/
public static void checkInvalidArgs(OptionParser parser,
OptionSet options,
OptionSpec<?> usedOption,
Set<OptionSpec<?>> invalidOptions) {
OptionSpec<?>[] array = new OptionSpec<?>[invalidOptions.size()];
invalidOptions.toArray(array);
checkInvalidArgs(parser, options, usedOption, array);
}

/**
* Check that none of the listed options are present with the combination of used options.
*/
public static void checkInvalidArgsSet(OptionParser parser,
OptionSet options,
Set<OptionSpec<?>> usedOptions,
Set<OptionSpec<?>> invalidOptions,
Optional<String> trailingAdditionalMessage) {
if (usedOptions.stream().filter(options::has).count() == usedOptions.size()) {
for (OptionSpec<?> arg : invalidOptions) {
if (options.has(arg)) {
printUsageAndExit(parser, String.format("Option combination \"%s\" can't be used with option \"%s\"%s",
usedOptions, arg, trailingAdditionalMessage.orElse("")));
}
}
}
}

public static void printUsageAndExit(OptionParser parser, String message) {
System.err.println(message);
try {
parser.printHelpOn(System.err);
} catch (IOException e) {
throw new RuntimeException(e);
}
Exit.exit(1, message);
}

public static void printVersionAndExit() {
System.out.println(AppInfoParser.getVersion());
Exit.exit(0);
}

/**
* Parse key-value pairs in the form key=value.
* Value may contain equals sign.
*/
public static Properties parseKeyValueArgs(List<String> args) {
return parseKeyValueArgs(args, true);
}

/**
* Parse key-value pairs in the form key=value.
* Value may contain equals sign.
*/
public static Properties parseKeyValueArgs(List<String> args, boolean acceptMissingValue) {
Properties props = new Properties();
List<String[]> splits = new ArrayList<>();
args.forEach(arg -> {
String[] split = arg.split("=", 2);
if (split.length > 0) {
splits.add(split);
}
});
splits.forEach(split -> {
if (split.length == 1 || (split.length == 2 && (split[1] == null || split[1].isEmpty()))) {
if (acceptMissingValue) {
props.put(split[0], "");
} else {
throw new IllegalArgumentException(String.format("Missing value for key %s}", split[0]));
}
} else {
props.put(split[0], split[1]);
}
});
return props;
}

/**
* Merge the options into {@code props} for key {@code key}, with the following precedence, from high to low:
* 1) if {@code spec} is specified on {@code options} explicitly, use the value;
* 2) if {@code props} already has {@code key} set, keep it;
* 3) otherwise, use the default value of {@code spec}.
* A {@code null} value means to remove {@code key} from the {@code props}.
*/
public static <T> void maybeMergeOptions(Properties props, String key, OptionSet options, OptionSpec<T> spec) {
if (options.has(spec) || !props.containsKey(key)) {
T value = options.valueOf(spec);
if (value == null) {
props.remove(key);
} else {
props.put(key, value.toString());
}
}
}
}
Loading

0 comments on commit 80e7f96

Please sign in to comment.