Skip to content

Commit

Permalink
feat: add methods to provide additional/finer config params (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
meistermeier authored Nov 15, 2023
1 parent 7e0e9e1 commit 3f41bcf
Show file tree
Hide file tree
Showing 10 changed files with 601 additions and 67 deletions.
14 changes: 13 additions & 1 deletion src/main/java/org/neo4j/cdc/client/model/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Metadata {
private static final String CONNECTION_SERVER = "connectionServer";
private static final String TX_START_TIME = "txStartTime";
private static final String TX_COMMIT_TIME = "txCommitTime";
private static final String TX_METADATA = "txMetadata";
private static final List<String> KNOWN_KEYS = List.of(
EXECUTING_USER,
AUTHENTICATED_USER,
Expand All @@ -42,7 +43,8 @@ public class Metadata {
CAPTURE_MODE,
SERVER_ID,
TX_COMMIT_TIME,
TX_START_TIME);
TX_START_TIME,
TX_METADATA);

private final String executingUser;
private final String connectionClient;
Expand All @@ -53,6 +55,7 @@ public class Metadata {
private final String connectionServer;
private final ZonedDateTime txStartTime;
private final ZonedDateTime txCommitTime;
private final Map<String, Object> txMetadata;
private final Map<String, Object> additionalEntries;

public Metadata(
Expand All @@ -65,6 +68,7 @@ public Metadata(
String connectionServer,
ZonedDateTime txStartTime,
ZonedDateTime txCommitTime,
Map<String, Object> txMetadata,
Map<String, Object> additionalEntries) {
this.executingUser = executingUser;
this.connectionClient = connectionClient;
Expand All @@ -75,6 +79,7 @@ public Metadata(
this.connectionServer = connectionServer;
this.txStartTime = Objects.requireNonNull(txStartTime);
this.txCommitTime = Objects.requireNonNull(txCommitTime);
this.txMetadata = txMetadata;
this.additionalEntries = additionalEntries;
}

Expand Down Expand Up @@ -118,6 +123,10 @@ public Map<String, Object> getAdditionalEntries() {
return additionalEntries;
}

public Map<String, Object> getTxMetadata() {
return txMetadata;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -168,6 +177,7 @@ public String toString() {
additionalEntries);
}

@SuppressWarnings("unchecked")
public static Metadata fromMap(Map<?, ?> map) {
var cypherMap = ModelUtils.checkedMap(Objects.requireNonNull(map), String.class, Object.class);

Expand All @@ -180,6 +190,7 @@ public static Metadata fromMap(Map<?, ?> map) {
var connectionServer = MapUtils.getString(cypherMap, CONNECTION_SERVER);
var txStartTime = ModelUtils.getZonedDateTime(cypherMap, TX_START_TIME);
var txCommitTime = ModelUtils.getZonedDateTime(cypherMap, TX_COMMIT_TIME);
var txMetadata = (Map<String, Object>) MapUtils.getMap(cypherMap, TX_METADATA);
var unknownEntries = cypherMap.entrySet().stream()
.filter(e -> !KNOWN_KEYS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -194,6 +205,7 @@ public static Metadata fromMap(Map<?, ?> map) {
connectionServer,
txStartTime,
txCommitTime,
txMetadata,
unknownEntries);
}
}
28 changes: 26 additions & 2 deletions src/main/java/org/neo4j/cdc/client/pattern/NodePattern.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.neo4j.cdc.client.pattern;

import java.util.Collections;
import static java.util.Collections.emptySet;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.neo4j.cdc.client.model.EntityOperation;
import org.neo4j.cdc.client.selector.NodeSelector;
import org.neo4j.cdc.client.selector.Selector;

Expand All @@ -37,6 +40,12 @@ public class NodePattern implements Pattern {
@NotNull
private final Set<String> excludeProperties;

private final Map<String, Object> metadata = new HashMap<>();

private EntityOperation entityOperation;

private Set<String> changesTo = emptySet();

public NodePattern(
@NotNull Set<String> labels,
@NotNull Map<String, Object> keyFilters,
Expand Down Expand Up @@ -99,6 +108,21 @@ public String toString() {
@Override
public Set<Selector> toSelector() {
return Set.of(new NodeSelector(
null, Collections.emptySet(), labels, keyFilters, includeProperties, excludeProperties));
entityOperation, changesTo, labels, keyFilters, includeProperties, excludeProperties, metadata));
}

@Override
public void withOperation(EntityOperation operation) {
this.entityOperation = operation;
}

@Override
public void withMetadata(Map<String, Object> metadata) {
this.metadata.putAll(metadata);
}

@Override
public void withChangesTo(Set<String> changesTo) {
this.changesTo = changesTo;
}
}
8 changes: 8 additions & 0 deletions src/main/java/org/neo4j/cdc/client/pattern/Pattern.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
package org.neo4j.cdc.client.pattern;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.neo4j.cdc.client.model.EntityOperation;
import org.neo4j.cdc.client.selector.Selector;

public interface Pattern {

Set<Selector> toSelector();

void withOperation(EntityOperation operation);

void withMetadata(Map<String, Object> metadata);

void withChangesTo(Set<String> changesTo);

static List<Pattern> parse(String expression) {
return Visitors.parse(expression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.neo4j.cdc.client.pattern;

import static java.util.Collections.emptySet;

import java.util.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.cdc.client.model.EntityOperation;
import org.neo4j.cdc.client.selector.RelationshipNodeSelector;
import org.neo4j.cdc.client.selector.RelationshipSelector;
import org.neo4j.cdc.client.selector.Selector;
Expand All @@ -44,6 +47,12 @@ public class RelationshipPattern implements Pattern {
@NotNull
private final Set<String> excludeProperties;

private final Map<String, Object> metadata = new HashMap<>();

private EntityOperation entityOperation;

private Set<String> changesTo = emptySet();

public RelationshipPattern(
@Nullable String type,
@NotNull NodePattern start,
Expand Down Expand Up @@ -135,27 +144,44 @@ public Set<Selector> toSelector() {
var result = new HashSet<Selector>();

result.add(new RelationshipSelector(
null,
Collections.emptySet(),
entityOperation,
changesTo,
type,
new RelationshipNodeSelector(start.getLabels(), start.getKeyFilters()),
new RelationshipNodeSelector(end.getLabels(), end.getKeyFilters()),
keyFilters,
includeProperties,
excludeProperties));
excludeProperties,
metadata));

if (bidirectional) {
result.add(new RelationshipSelector(
null,
Collections.emptySet(),
entityOperation,
changesTo,
type,
new RelationshipNodeSelector(end.getLabels(), end.getKeyFilters()),
new RelationshipNodeSelector(start.getLabels(), start.getKeyFilters()),
keyFilters,
includeProperties,
excludeProperties));
excludeProperties,
metadata));
}

return result;
}

@Override
public void withOperation(EntityOperation operation) {
this.entityOperation = operation;
}

@Override
public void withMetadata(Map<String, Object> metadata) {
this.metadata.putAll(metadata);
}

@Override
public void withChangesTo(Set<String> changesTo) {
this.changesTo = changesTo;
}
}
64 changes: 61 additions & 3 deletions src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@
*/
package org.neo4j.cdc.client.selector;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.cdc.client.model.*;

public class EntitySelector implements Selector {

public static final String METADATA_KEY_AUTHENTICATED_USER = "authenticatedUser";
public static final String METADATA_KEY_EXECUTING_USER = "executingUser";
public static final String METADATA_KEY_TX_METADATA = "txMetadata";

@Nullable
private final EntityOperation change;

Expand All @@ -37,27 +44,37 @@ public class EntitySelector implements Selector {
@NotNull
private final Set<String> excludeProperties;

@NotNull
private final Map<String, Object> metadata;

public EntitySelector() {
this(null);
}

public EntitySelector(@Nullable EntityOperation change) {
this(change, emptySet());
this(change, emptySet(), emptyMap());
}

public EntitySelector(@Nullable EntityOperation change, @NotNull Set<String> changesTo) {
this(change, changesTo, emptySet(), emptySet());
this(change, changesTo, emptySet(), emptySet(), emptyMap());
}

public EntitySelector(
@Nullable EntityOperation change, @NotNull Set<String> changesTo, @NotNull Map<String, Object> metadata) {
this(change, changesTo, emptySet(), emptySet(), metadata);
}

public EntitySelector(
@Nullable EntityOperation change,
@NotNull Set<String> changesTo,
@NotNull Set<String> includeProperties,
@NotNull Set<String> excludeProperties) {
@NotNull Set<String> excludeProperties,
@NotNull Map<String, Object> metadata) {
this.change = change;
this.changesTo = Objects.requireNonNull(changesTo);
this.includeProperties = Objects.requireNonNull(includeProperties);
this.excludeProperties = Objects.requireNonNull(excludeProperties);
this.metadata = metadata;
}

public @Nullable EntityOperation getChange() {
Expand All @@ -76,6 +93,10 @@ public EntitySelector(
return excludeProperties;
}

public @NotNull Map<String, Object> getMetadata() {
return metadata;
}

@SuppressWarnings("unchecked")
@Override
public boolean matches(ChangeEvent e) {
Expand Down Expand Up @@ -123,6 +144,32 @@ public boolean matches(ChangeEvent e) {
break;
}
}
if (!metadata.isEmpty()) {
Object authenticatedUser = metadata.get(METADATA_KEY_AUTHENTICATED_USER);
if (authenticatedUser != null
&& !e.getMetadata().getAuthenticatedUser().equals(authenticatedUser)) {
return false;
}
Object executingUser = metadata.get(METADATA_KEY_EXECUTING_USER);
if (executingUser != null && !e.getMetadata().getExecutingUser().equals(executingUser)) {
return false;
}
var txMetadata = MapUtils.getMap(metadata, METADATA_KEY_TX_METADATA, emptyMap()).entrySet().stream()
.collect(Collectors.toMap(
entry -> {
if (entry.getKey() instanceof String) {
return (String) entry.getKey();
}

throw new IllegalArgumentException(String.format(
"expected map key to be a String but got '%s'.",
entry.getKey().getClass().getSimpleName()));
},
entry -> (Object) entry.getValue()));
if (!e.getMetadata().getTxMetadata().entrySet().containsAll(txMetadata.entrySet())) {
return false;
}
}

return true;
}
Expand Down Expand Up @@ -218,6 +265,15 @@ public Map<String, Object> asMap() {
if (!changesTo.isEmpty()) {
result.put("changesTo", changesTo);
}
if (metadata.containsKey(METADATA_KEY_AUTHENTICATED_USER)) {
result.put("authenticatedUser", metadata.get(METADATA_KEY_AUTHENTICATED_USER));
}
if (metadata.containsKey(METADATA_KEY_EXECUTING_USER)) {
result.put("executingUser", metadata.get(METADATA_KEY_EXECUTING_USER));
}
if (metadata.containsKey(METADATA_KEY_TX_METADATA)) {
result.put("txMetadata", metadata.get(METADATA_KEY_TX_METADATA));
}

return result;
}
Expand All @@ -232,6 +288,7 @@ public boolean equals(Object o) {
if (change != that.change) return false;
if (!changesTo.equals(that.changesTo)) return false;
if (!includeProperties.equals(that.includeProperties)) return false;
if (!metadata.equals(that.metadata)) return false;
return excludeProperties.equals(that.excludeProperties);
}

Expand All @@ -241,6 +298,7 @@ public int hashCode() {
result = 31 * result + changesTo.hashCode();
result = 31 * result + includeProperties.hashCode();
result = 31 * result + excludeProperties.hashCode();
result = 31 * result + metadata.hashCode();
return result;
}
}
Loading

0 comments on commit 3f41bcf

Please sign in to comment.