Skip to content
This repository has been archived by the owner on May 5, 2024. It is now read-only.

feat: Add support for whitelisting topic prefixes #74

Merged
merged 1 commit into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ The desired state file consists of:
- **topics** [Optional]:
- **defaults** [Optional]: Specify topic defaults so you don't need to specify them for every topic in the state file. Currently, only replication is supported.
- **blacklist** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
- **whitelist** [Optional]: Add a prefixed topic whitelist for exclusively handling specific topics when using `kafka-gitops`. This allows topics to be exclusively handled and topics not on the list are being ignored, even if they are not defined in the desired state file.

?> `topics.blacklist` and `topics.whitelist` are _not mutually exclusive_ can be used together to whitelist specific topic prefixes and blacklist individual "sub-topics".

?> The blacklist takes precedence over the whitelist, so if a topic name is matched by both, it will be ignored and not deleted if it was not defined in the desired state file.

**Example**:
```yaml
Expand All @@ -35,6 +40,10 @@ settings:
blacklist:
prefixed:
- _confluent
- my-topics-excluded
whitelist:
prefixed:
- my-topics
```

## Topics
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/com/devshawn/kafka/gitops/StateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.devshawn.kafka.gitops.domain.state.settings.Settings;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsCCloud;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopics;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsBlacklist;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsList;
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
Expand All @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -153,7 +154,8 @@ private void createServiceAccount(String name, List<ServiceAccount> serviceAccou
private DesiredState getDesiredState() {
DesiredStateFile desiredStateFile = getAndValidateStateFile();
DesiredState.Builder desiredState = new DesiredState.Builder()
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
.addAllPrefixedTopicsToAccept(getPrefixedTopicsToAccept(desiredStateFile));

generateTopicsState(desiredState, desiredStateFile);

Expand Down Expand Up @@ -297,7 +299,7 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
desiredStateFile.getSettings()
.flatMap(Settings::getTopics)
.flatMap(SettingsTopics::getBlacklist)
.map(SettingsTopicsBlacklist::getPrefixed)
.map(SettingsTopicsList::getPrefixed)
.ifPresent(topics::addAll);

desiredStateFile.getServices().forEach((name, service) -> {
Expand All @@ -308,6 +310,16 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
return topics;
}

private List<String> getPrefixedTopicsToAccept(DesiredStateFile desiredStateFile) {
return desiredStateFile.getSettings()
.flatMap(Settings::getTopics)
.flatMap(SettingsTopics::getWhitelist)
.map(SettingsTopicsList::getPrefixed)
.stream()
.flatMap(Collection::stream)
.toList();
}

private GetAclOptions buildGetAclOptions(String serviceName) {
return new GetAclOptions.Builder().setServiceName(serviceName).setDescribeAclEnabled(describeAclEnabled).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface DesiredState {

List<String> getPrefixedTopicsToIgnore();

List<String> getPrefixedTopicsToAccept();

class Builder extends DesiredState_Builder {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public interface SettingsTopics {

Optional<SettingsTopicsDefaults> getDefaults();

Optional<SettingsTopicsBlacklist> getBlacklist();
Optional<SettingsTopicsList> getBlacklist();

Optional<SettingsTopicsList> getWhitelist();

class Builder extends SettingsTopics_Builder {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import java.util.List;

@FreeBuilder
@JsonDeserialize(builder = SettingsTopicsBlacklist.Builder.class)
public interface SettingsTopicsBlacklist {
@JsonDeserialize(builder = SettingsTopicsList.Builder.class)
public interface SettingsTopicsList {

List<String> getPrefixed();

class Builder extends SettingsTopicsBlacklist_Builder {
class Builder extends SettingsTopicsList_Builder {
}
}
20 changes: 13 additions & 7 deletions src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,28 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
desiredPlan.addTopicPlans(topicPlan.build());
});

topics.forEach(currentTopic -> {
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (shouldIgnore) {
LOG.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
return;
for (TopicListing currentTopic : topics) {
boolean acceptTopic = desiredState.getPrefixedTopicsToAccept().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (!desiredState.getPrefixedTopicsToAccept().isEmpty() && !acceptTopic) {
LOG.info("[PLAN] Ignoring topic {} due to missing prefix (whitelist)", currentTopic.name());
continue;
}

if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
boolean ignoreTopic = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (ignoreTopic) {
LOG.info("[PLAN] Ignoring topic {} due to prefix (blacklist)", currentTopic.name());
continue;
}

if (!managerConfig.isDeleteDisabled() && !desiredState.getTopics().containsKey(currentTopic.name())) {
TopicPlan topicPlan = new TopicPlan.Builder()
.setName(currentTopic.name())
.setAction(PlanAction.REMOVE)
.build();

desiredPlan.addTopicPlans(topicPlan);
}
});
}
}

private void planTopicConfigurations(String topicName, TopicDetails topicDetails, List<ConfigEntry> configs, TopicPlan.Builder topicPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class PlanCommandIntegrationSpec extends Specification {
"seed-topic-modification-no-delete" | true
"seed-acl-exists" | true
"seed-blacklist-topics" | false
"seed-blacklist-whitelist-topics" | false
"seed-whitelist-topics" | false
}

void 'test include unchanged flag - #planName #includeUnchanged'() {
Expand Down
35 changes: 35 additions & 0 deletions src/test/resources/plans/seed-blacklist-whitelist-topics-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"topicPlans": [
{
"name": "new-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "topic-with-configs-1",
"action": "REMOVE",
"topicDetails": null,
"topicConfigPlans": []
}
],
"aclPlans": [
{
"name": "Unnamed ACL",
"aclDetails": {
"name": "test-topic",
"type": "TOPIC",
"pattern": "LITERAL",
"principal": "User:test",
"host": "*",
"operation": "READ",
"permission": "ALLOW"
},
"action": "REMOVE"
}
]
}
14 changes: 14 additions & 0 deletions src/test/resources/plans/seed-blacklist-whitelist-topics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
settings:
topics:
blacklist:
prefixed:
- test
- topic-with-configs-2
whitelist:
prefixed:
- topic-with

topics:
new-topic:
partitions: 6
replication: 1
35 changes: 35 additions & 0 deletions src/test/resources/plans/seed-whitelist-topics-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"topicPlans": [
{
"name": "test-new-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "test-topic",
"action": "REMOVE",
"topicDetails": null,
"topicConfigPlans": []
}
],
"aclPlans": [
{
"name": "Unnamed ACL",
"aclDetails": {
"name": "test-topic",
"type": "TOPIC",
"pattern": "LITERAL",
"principal": "User:test",
"host": "*",
"operation": "READ",
"permission": "ALLOW"
},
"action": "REMOVE"
}
]
}
10 changes: 10 additions & 0 deletions src/test/resources/plans/seed-whitelist-topics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
settings:
topics:
whitelist:
prefixed:
- test

topics:
test-new-topic:
partitions: 6
replication: 1