Skip to content

Commit

Permalink
[GOBBLIN-1864] Catch exceptions if specific watermark calculation fai…
Browse files Browse the repository at this point in the history
…ls (apache#3728)

* Catch exceptions if specific watermark calculation fails

* Add null handling for watermark map
  • Loading branch information
jack-moseley authored and phet committed Aug 15, 2023
1 parent d885952 commit ae5e072
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.completeness.verifier;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -125,10 +126,13 @@ public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName,
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));

Map<CompletenessType, Boolean> result = new HashMap<>();
result.put(CompletenessType.ClassicCompleteness, calculateCompleteness(datasetName, beginInMillis, endInMillis,
CompletenessType.ClassicCompleteness, countsByTier) > threshold);
result.put(CompletenessType.TotalCountCompleteness, calculateCompleteness(datasetName, beginInMillis, endInMillis,
CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
Arrays.stream(CompletenessType.values()).forEach(type -> {
try {
result.put(type, calculateCompleteness(datasetName, beginInMillis, endInMillis, type, countsByTier) > threshold);
} catch (IOException e) {
log.error("Failed to calculate completeness for type " + type, e);
}
});
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,28 @@ public void testEmptyAuditCount() throws IOException {
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}

public void testOneCountFailed() throws IOException {
final String topic = "testTopic";
State props = new State();
props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, TOTAL_COUNT_REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
TestAuditClient client = new TestAuditClient(props);
KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);

// Missing total count tier which will throw exception
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
));

// Classic completeness is still returned, but total is missing
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public ClassicWatermarkUpdater(long previousWatermark, String timeZone,
@Override
protected void computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> results,
ZonedDateTime timestampDT) {
if (!results.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness)) {
if (!results.getOrDefault(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness, false)) {
return;
}

Expand Down Expand Up @@ -261,7 +261,7 @@ public TotalCountWatermarkUpdater(long previousWatermark, String timeZone,
@Override
protected void computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> results,
ZonedDateTime timestampDT) {
if (!results.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness)) {
if (!results.getOrDefault(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, false)) {
return;
}

Expand Down

0 comments on commit ae5e072

Please sign in to comment.