Skip to content

Commit

Permalink
#14 now before a NPE occurs the missing channel name is printed in a …
Browse files Browse the repository at this point in the history
…error logging message.
  • Loading branch information
ewagasow committed Jan 24, 2019
1 parent ecd21fa commit ce464c2
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.pragmaticminds.crunch.api.records.MRecord;
import org.pragmaticminds.crunch.api.trigger.comparator.Supplier;
import org.pragmaticminds.crunch.api.values.dates.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -44,6 +46,7 @@
* Created by Erwin Wagasow on 19.09.2018
*/
class ChannelMapExtractor implements MapExtractor {
private static final Logger logger = LoggerFactory.getLogger(ChannelMapExtractor.class);
private HashMap<Supplier, String> mappings = null;
private ArrayList<Supplier> channels = null;

Expand Down Expand Up @@ -102,10 +105,16 @@ public Map<String, Value> extract(EvaluationContext context) {
supplier -> Value.of(supplier.extract(context.get()))
));
}else if(mappings != null){
return mappings.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getValue,
entry -> Value.of(entry.getKey().extract(context.get()))
));
try {
return mappings.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getValue,
entry -> Value.of(entry.getKey().extract(context.get()))
));
} catch (Exception e) {
logger.error("caught exceptioin!", e);

return null;
}
}
return null;
}
Expand All @@ -114,11 +123,11 @@ public Map<String, Value> extract(EvaluationContext context) {
if (channels == null) {
// Mappings
return mappings.keySet().stream()
.map(identifier -> identifier.getIdentifier())
.map(Supplier::getIdentifier)
.collect(Collectors.toSet());
}
return channels.stream()
.map(identifier -> identifier.getIdentifier())
.map(Supplier::getIdentifier)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import lombok.NoArgsConstructor;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.pragmaticminds.crunch.api.values.dates.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Collection;
Expand All @@ -48,6 +50,7 @@
@Builder
@NoArgsConstructor
public class TypedValues implements MRecord, Serializable {
private static final Logger logger = LoggerFactory.getLogger(TypedValues.class);

// Everything Transient, thus it uses custom serializers, see below.
private String source;
Expand All @@ -68,33 +71,62 @@ public TypedValues(String source, long timestamp, Map<String, Value> values) {

@Override
public Double getDouble(String channel) {
return !values.containsKey(channel) ? null : values.get(channel).getAsDouble();
if(!values.containsKey(channel)){
printError(channel);
return null;
}
return values.get(channel).getAsDouble();
}

@Override
public Long getLong(String channel) {
return !values.containsKey(channel) ? null : values.get(channel).getAsLong();
if(!values.containsKey(channel)){
printError(channel);
return null;
}
return values.get(channel).getAsLong();
}

@Override
@SuppressWarnings("squid:S2447") // null should not be returned, in this case it is necessary
public Boolean getBoolean(String channel) {
return !values.containsKey(channel) ? null : values.get(channel).getAsBoolean();
if(!values.containsKey(channel)){
printError(channel);
return null;
}
return values.get(channel).getAsBoolean();
}

@Override
public Date getDate(String channel) {
return !values.containsKey(channel) ? null : values.get(channel).getAsDate();
if(!values.containsKey(channel)){
printError(channel);
return null;
}
return values.get(channel).getAsDate();
}

@Override
public String getString(String channel) {
return !values.containsKey(channel) ? null : values.get(channel).getAsString();
if(!values.containsKey(channel)){
printError(channel);
return null;
}
return values.get(channel).getAsString();
}

@Override
public Value getValue(String channel) {
if(!values.containsKey(channel)){
printError(channel);
return null;
}
return values.getOrDefault(channel, null);
}

private void printError(String channel) {
logger.error("Channel with the name \"{}\" is not present!", channel);
}

/**
* This method should not be used on this class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import lombok.NoArgsConstructor;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.pragmaticminds.crunch.api.values.dates.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -46,6 +48,7 @@
@EqualsAndHashCode
@NoArgsConstructor
public class UntypedValues implements MRecord {
private static final Logger logger = LoggerFactory.getLogger(UntypedValues.class);

private String source;
private long timestamp;
Expand All @@ -64,42 +67,68 @@ public UntypedValues(
@Override
public Double getDouble(String channel) {
Value v = getValue(channel);
return v == null ? null : v.getAsDouble();
if (v == null) {
printError(channel);
return null;
}
return v.getAsDouble();
}

@Override
public Long getLong(String channel) {
Value v = getValue(channel);
return v == null ? null : v.getAsLong();
if (v == null) {
printError(channel);
return null;
}
return v.getAsLong();
}

@Override
@SuppressWarnings("squid:S2447") // null should not be returned, in this case it is necessary
public Boolean getBoolean(String channel) {
Value v = getValue(channel);
return v == null ? null : v.getAsBoolean();
if (v == null) {
printError(channel);
return null;
}
return v.getAsBoolean();
}

@Override
public Date getDate(String channel) {
Value v = getValue(channel);
return v == null ? null : v.getAsDate();
if (v == null) {
printError(channel);
return null;
}
return v.getAsDate();
}

@Override
public String getString(String channel) {
Value v = getValue(channel);
return v == null ? null : v.getAsString();
if (v == null) {
printError(channel);
return null;
}
return v.getAsString();
}

@Override
public Value getValue(String channel) {
Object v = get(channel);
return v == null ? null : Value.of(v);
if (v == null) {
printError(channel);
return null;
}
return Value.of(v);
}

@Override
public Object get(String channel) {
if (!values.containsKey(channel)) {
printError(channel);
return null;
}
return values.get(channel);
Expand All @@ -110,6 +139,10 @@ public Object get(String channel) {
public Collection<String> getChannels() {
return this.values.keySet();
}

private void printError(String channel) {
logger.error("Channel with the name \"{}\" is not present!", channel);
}

/**
* converts UnTyped-Values to typed values
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.pragmaticminds.crunch.api.trigger.extractor;

import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.pragmaticminds.crunch.api.pipe.EvaluationContext;
import org.pragmaticminds.crunch.api.pipe.SimpleEvaluationContext;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.pragmaticminds.crunch.api.trigger.comparator.Supplier;
import org.pragmaticminds.crunch.api.trigger.comparator.Suppliers;
import org.pragmaticminds.crunch.api.values.UntypedValues;
import org.pragmaticminds.crunch.api.values.dates.Value;

import java.util.HashMap;
import java.util.Map;

/**
* @author erwin.wagasow
* created by erwin.wagasow on 24.01.2019
*/
public class BugTicket14MT {

/**
* A {@link NullPointerException} appeared when not all mapped channels where present in the record.
*/
@Test
public void nullPointerExceptionInChannelMapExtractorExtract() {

// create channel mappings for the extractor
Map<Supplier, String> mapping = new HashMap<>();
mapping.put(Suppliers.ChannelExtractors.stringChannel("channel1"), "new_channel1");
mapping.put(Suppliers.ChannelExtractors.stringChannel("channel2"), "new_channel2");
mapping.put(Suppliers.ChannelExtractors.stringChannel("channel3"), "new_channel3");
mapping.put(Suppliers.ChannelExtractors.stringChannel("channel4"), "new_channel4");
mapping.put(Suppliers.ChannelExtractors.stringChannel("channel5"), "new_channel5");

// create the extractor
ChannelMapExtractor extractor = new ChannelMapExtractor(mapping);

Map<String, Object> values = ImmutableMap.of(
"channel1", Value.of("test1"),
"channel2", Value.of("test2"),
"channel3", Value.of("test3"),
"channel4", Value.of("test4")
);

// build a record, that does not hold all necessary channels
MRecord record = UntypedValues.builder()
.values(values)
.build();
EvaluationContext context = new SimpleEvaluationContext(record);
Map<String, Value> extractResult = extractor.extract(context);

// now the missing channel names are printed before the NullPointerException occurs
}
}

0 comments on commit ce464c2

Please sign in to comment.