Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZooKeeper to KRaft migration #9324

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ We also control imports only in production classes and not in tests. This is con
<allow class="io.strimzi.operator.common.Annotations" />
<allow class="io.strimzi.operator.cluster.ClusterOperatorConfig" />
<allow class="io.strimzi.operator.cluster.PlatformFeaturesAvailability" />
<allow class="io.strimzi.operator.cluster.operator.resource.KafkaMetadataStateManager" />
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
<allow class="io.strimzi.operator.cluster.operator.resource.KafkaMetadataConfigurationState" />

<!-- Other Strimzi projects -->
<allow pkg="io.strimzi.kafka.oauth" />
Expand All @@ -102,4 +104,4 @@ We also control imports only in production classes and not in tests. This is con
<allow class="io.strimzi.operator.common.ReconciliationLogger" />
<allow class="io.strimzi.operator.common.Util" />
</subpackage>
</import-control>
</import-control>
5 changes: 5 additions & 0 deletions api/src/main/java/io/strimzi/api/kafka/model/Kafka.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
description = "The state of the custom resource",
jsonPath = ".status.conditions[?(@.type==\"Ready\")].status",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
scholzj marked this conversation as resolved.
Show resolved Hide resolved
name = "Metadata State",
description = "The state of the cluster metadata",
jsonPath = ".status.kafkaMetadataState",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "Warnings",
description = "Warnings related to the custom resource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion" })
@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" })
@EqualsAndHashCode
@ToString(callSuper = true)
public class KafkaStatus extends Status {
Expand All @@ -35,6 +35,7 @@ public class KafkaStatus extends Status {
private String operatorLastSuccessfulVersion;
private String kafkaVersion;
private String kafkaMetadataVersion;
private String kafkaMetadataState;
ppatierno marked this conversation as resolved.
Show resolved Hide resolved

@Description("Addresses of the internal and external listeners")
public List<ListenerStatus> getListeners() {
Expand Down Expand Up @@ -89,4 +90,18 @@ public String getKafkaMetadataVersion() {
public void setKafkaMetadataVersion(String kafkaMetadataVersion) {
this.kafkaMetadataVersion = kafkaMetadataVersion;
}

@Description("Defines where cluster metadata are stored. Possible values are: " +
"ZooKeeper if the metadata are stored in ZooKeeper " +
"KRaftMigration if the controllers are connected to ZooKeeper, together with brokers, and the migration process is running " +
"KRaftDualWriting if the migration process finished and the cluster is in dual-write mode " +
"KRaftPostMigration if the brokers are fully KRaft-based but controllers being rolled to disconnect from ZooKeeper " +
"KRaft if the metadata are stored in KRaft")
public String getKafkaMetadataState() {
return kafkaMetadataState;
}

public void setKafkaMetadataState(String metadataState) {
this.kafkaMetadataState = metadataState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.kafka.oauth.server.ServerConfig;
import io.strimzi.kafka.oauth.server.plain.ServerPlainConfig;
import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter;
import io.strimzi.operator.cluster.operator.resource.KafkaMetadataConfigurationState;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
import io.strimzi.operator.common.Reconciliation;

Expand Down Expand Up @@ -68,21 +69,21 @@ public class KafkaBrokerConfigurationBuilder {
private final StringWriter stringWriter = new StringWriter();
private final PrintWriter writer = new PrintWriter(stringWriter);
private final Reconciliation reconciliation;
private final String brokerId;
private final boolean useKRaft;
private final KafkaMetadataConfigurationState kafkaMetadataConfigState;
private final NodeRef node;

/**
* Broker configuration template constructor
*
* @param reconciliation The reconciliation
* @param brokerId The ID of the broker
* @param useKRaft Indicates whether KRaft is used or not
* @param node NodeRef instance
* @param kafkaMetadataConfigState Represents the state of the Kafka metadata configuration
*/
public KafkaBrokerConfigurationBuilder(Reconciliation reconciliation, String brokerId, boolean useKRaft) {
public KafkaBrokerConfigurationBuilder(Reconciliation reconciliation, NodeRef node, KafkaMetadataConfigurationState kafkaMetadataConfigState) {
printHeader();
this.reconciliation = reconciliation;
this.brokerId = brokerId;
this.useKRaft = useKRaft;
this.node = node;
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
this.kafkaMetadataConfigState = kafkaMetadataConfigState;

// Render the node/broker ID into the config file
configureNodeOrBrokerId();
Expand All @@ -94,13 +95,12 @@ public KafkaBrokerConfigurationBuilder(Reconciliation reconciliation, String bro
private void configureNodeOrBrokerId() {
printSectionHeader("Node / Broker ID");

if (useKRaft) {
writer.println("node.id=" + brokerId);
} else {
writer.println("broker.id=" + brokerId);
// Node ID is ignored when not using Kraft mode => but it defaults to broker ID when not set.
// We set it here in the configuration explicitly to avoid never ending rolling updates.
writer.println("node.id=" + brokerId);
// Node ID is ignored when not using Kraft mode => but it defaults to broker ID when not set.
// We set it here in the configuration explicitly to avoid never ending rolling updates.
writer.println("node.id=" + node.nodeId());
// only broker in ZooKeeper-mode or during migration needs the Broker ID to be set
if (node.broker() && kafkaMetadataConfigState.isZooKeeperOrMigration()) {
writer.println("broker.id=" + node.nodeId());
}

writer.println();
Expand Down Expand Up @@ -192,6 +192,20 @@ public KafkaBrokerConfigurationBuilder withZookeeper(String clusterName) {
return this;
}

/**
* Configures ZooKeeper migration related flag
*
* @param isEnabled if ZooKeeper migration is enabled
* @return the builder instance
*/
public KafkaBrokerConfigurationBuilder withZooKeeperMigration(boolean isEnabled) {
printSectionHeader("Zookeeper migration");
writer.println("zookeeper.metadata.migration.enable=" + isEnabled);
writer.println();

return this;
}

/**
* Adds the KRaft configuration for ZooKeeper-less Kafka. This includes the roles of the broker, the controller
* listener name and the list of all controllers for quorum voting.
Expand All @@ -205,7 +219,9 @@ public KafkaBrokerConfigurationBuilder withZookeeper(String clusterName) {
*/
public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String namespace, Set<ProcessRoles> kraftRoles, Set<NodeRef> nodes) {
printSectionHeader("KRaft configuration");
writer.println("process.roles=" + kraftRoles.stream().map(role -> role.toValue()).sorted().collect(Collectors.joining(",")));
if (node.controller() || (node.broker() && kafkaMetadataConfigState.isPostMigrationOrKRaft())) {
writer.println("process.roles=" + kraftRoles.stream().map(role -> role.toValue()).sorted().collect(Collectors.joining(",")));
}
writer.println("controller.listener.names=" + CONTROL_PLANE_LISTENER_NAME);

// Generates the controllers quorum list
Expand Down Expand Up @@ -238,6 +254,7 @@ public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String name
*
* @return Returns the builder instance
*/
@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
public KafkaBrokerConfigurationBuilder withListeners(
String clusterName,
String namespace,
Expand All @@ -250,14 +267,14 @@ public KafkaBrokerConfigurationBuilder withListeners(
List<String> advertisedListeners = new ArrayList<>();
List<String> securityProtocol = new ArrayList<>();

boolean isKraftControllerOnly = useKRaft && node.controller() && !node.broker();
boolean isKraftBrokerOnly = useKRaft && node.broker() && !node.controller();
boolean isKraftControllerOnly = kafkaMetadataConfigState.isKRaftInConfiguration(node) && node.controller() && !node.broker();

// Control Plane listener => ZooKeeper based brokers and KRaft controllers (including mixed nodes). But not on KRaft broker only nodes
if (!isKraftBrokerOnly) {
// Control Plane listener is set for pure KRaft controller or combined node, and broker in ZooKeeper mode or in migration state but not when full KRaft.
if (node.controller() || (node.broker() && kafkaMetadataConfigState.isZooKeeperOrMigration())) {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");

if (!useKRaft) {
// Control Plane listener to be advertised only with broker in ZooKeeper-based or migration but NOT when full KRaft only or mixed
if (node.broker() && kafkaMetadataConfigState.isZooKeeperOrMigration()) {
advertisedListeners.add(String.format("%s://%s:9090",
CONTROL_PLANE_LISTENER_NAME,
// Pod name constructed to be templatable for each individual ordinal
Expand All @@ -277,6 +294,12 @@ public KafkaBrokerConfigurationBuilder withListeners(
// so that brokers are able to connect to controllers as TLS clients
configureControlPlaneListener();

// Replication Listener to be configured on brokers and KRaft controllers only but until post-migration
if (node.broker() || node.controller() && kafkaMetadataConfigState.isZooKeeperOrPostMigration()) {
securityProtocol.add(REPLICATION_LISTENER_NAME + ":SSL");
configureReplicationListener();
}

// Non-controller listeners are used only on ZooKeeper based brokers or KRaft brokers (including mixed nodes)
if (!isKraftControllerOnly) {
// Replication listener
Expand All @@ -286,8 +309,6 @@ public KafkaBrokerConfigurationBuilder withListeners(
// Pod name constructed to be templatable for each individual ordinal
DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.brokersServiceName(clusterName), node.podName())
));
securityProtocol.add(REPLICATION_LISTENER_NAME + ":SSL");
configureReplicationListener();

for (GenericKafkaListener listener : kafkaListeners) {
int port = listener.getPort();
Expand Down Expand Up @@ -324,10 +345,13 @@ public KafkaBrokerConfigurationBuilder withListeners(
if (!isKraftControllerOnly) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller() && kafkaMetadataConfigState.isZooKeeperOrPostMigration()) {
// needed for KRaft controller only as well until post-migration because it needs to contact brokers
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
}

// Control plane listener is on all ZooKeeper based brokers, it's not supported in KRaft mode
if (!useKRaft) {
// Control plane listener is on all ZooKeeper based brokers, needed during migration as well, when broker still using ZooKeeper but KRaft controllers are ready
if (node.broker() && kafkaMetadataConfigState.isZooKeeperOrMigration()) {
writer.println("control.plane.listener.name=" + CONTROL_PLANE_LISTENER_NAME);
}

Expand Down Expand Up @@ -631,7 +655,7 @@ public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, Kaf
superUsers.add(String.format("User:CN=%s,O=io.strimzi", "cluster-operator"));

printSectionHeader("Authorization");
configureAuthorization(clusterName, superUsers, authorization, useKRaft);
configureAuthorization(clusterName, superUsers, authorization, kafkaMetadataConfigState.isKRaftInConfiguration(node));
writer.println("super.users=" + String.join(";", superUsers));
writer.println();
}
Expand All @@ -645,11 +669,11 @@ public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, Kaf
* @param clusterName Name of the cluster
* @param superUsers Super-users list who have all the rights on the cluster
* @param authorization The authorization configuration from the Kafka CR
* @param useKRaft Use KRaft mode in the configuration
* @param isKRaft Use KRaft mode in the configuration
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
*/
private void configureAuthorization(String clusterName, List<String> superUsers, KafkaAuthorization authorization, boolean useKRaft) {
private void configureAuthorization(String clusterName, List<String> superUsers, KafkaAuthorization authorization, boolean isKRaft) {
if (authorization instanceof KafkaAuthorizationSimple simpleAuthz) {
configureSimpleAuthorization(simpleAuthz, superUsers, useKRaft);
configureSimpleAuthorization(simpleAuthz, superUsers, isKRaft);
} else if (authorization instanceof KafkaAuthorizationOpa opaAuthz) {
configureOpaAuthorization(opaAuthz, superUsers);
} else if (authorization instanceof KafkaAuthorizationKeycloak keycloakAuthz) {
Expand All @@ -664,10 +688,10 @@ private void configureAuthorization(String clusterName, List<String> superUsers,
*
* @param authorization Simple authorization configuration
* @param superUsers Super-users list who have all the rights on the cluster
* @param useKRaft Use KRaft mode in the configuration
* @param isKRaft Use KRaft mode in the configuration
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
*/
private void configureSimpleAuthorization(KafkaAuthorizationSimple authorization, List<String> superUsers, boolean useKRaft) {
if (useKRaft) {
private void configureSimpleAuthorization(KafkaAuthorizationSimple authorization, List<String> superUsers, boolean isKRaft) {
if (isKRaft) {
writer.println("authorizer.class.name=" + KafkaAuthorizationSimple.KRAFT_AUTHORIZER_CLASS_NAME);
} else {
writer.println("authorizer.class.name=" + KafkaAuthorizationSimple.AUTHORIZER_CLASS_NAME);
Expand Down Expand Up @@ -815,7 +839,7 @@ public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration
public KafkaBrokerConfigurationBuilder withLogDirs(List<VolumeMount> mounts) {
// We take all the data mount points and add the broker specific path
String logDirs = mounts.stream()
.map(volumeMount -> volumeMount.getMountPath() + "/kafka-log" + brokerId).collect(Collectors.joining(","));
.map(volumeMount -> volumeMount.getMountPath() + "/kafka-log" + node.nodeId()).collect(Collectors.joining(","));

printSectionHeader("Kafka message logs configuration");
writer.println("log.dirs=" + logDirs);
Expand Down
Loading
Loading