Skip to content

Commit

Permalink
STORM-1147: Added validation checks and tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Parth-Brahmbhatt committed Oct 29, 2015
1 parent b6615d5 commit c367fdb
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import org.apache.commons.lang.Validate;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.slf4j.Logger;
Expand Down Expand Up @@ -52,6 +53,7 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector co
}

public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
Validate.notNull(connectionProvider);
this.connectionProvider = connectionProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
Expand All @@ -45,15 +46,23 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {

public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) {
super(connectionProvider);

Validate.notNull(jdbcMapper);
this.jdbcMapper = jdbcMapper;
}

public JdbcInsertBolt withTableName(String tableName) {
if (insertQuery != null) {
throw new IllegalArgumentException("You can not specify both insertQuery and tableName.");
}
this.tableName = tableName;
return this;
}

public JdbcInsertBolt withInsertQuery(String insertQuery) {
if (this.tableName != null) {
throw new IllegalArgumentException("You can not specify both insertQuery and tableName.");
}
this.insertQuery = insertQuery;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.commons.lang.Validate;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
Expand All @@ -40,6 +41,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {

public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
super(connectionProvider);

Validate.notNull(selectQuery);
Validate.notNull(jdbcLookupMapper);

this.selectQuery = selectQuery;
this.jdbcLookupMapper = jdbcLookupMapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Values;
import org.apache.commons.lang.Validate;
import org.apache.storm.jdbc.common.Column;

import java.util.ArrayList;
Expand All @@ -33,6 +34,8 @@ public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLook

public SimpleJdbcLookupMapper(Fields outputFields, List<Column> queryColumns) {
super(queryColumns);

Validate.notEmpty(outputFields.toList());
this.outputFields = outputFields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.storm.jdbc.mapper;

import backtype.storm.tuple.ITuple;
import org.apache.commons.lang.Validate;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
Expand All @@ -35,13 +36,17 @@ public class SimpleJdbcMapper implements JdbcMapper {
private List<Column> schemaColumns;

public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) {
Validate.notEmpty(tableName);
Validate.notNull(connectionProvider);

int queryTimeoutSecs = 30;
connectionProvider.prepare();
JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs);
this.schemaColumns = client.getColumnSchema(tableName);
}

public SimpleJdbcMapper(List<Column> schemaColumns) {
Validate.notEmpty(schemaColumns);
this.schemaColumns = schemaColumns;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.jdbc.bolt;

import com.google.common.collect.Lists;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;

/**
* Created by pbrahmbhatt on 10/29/15.
*/
public class JdbcInsertBoltTest {

@Test
public void testValidation() {
ConnectionProvider provider = new HikariCPConnectionProvider(new HashMap<String, Object>());
JdbcMapper mapper = new SimpleJdbcMapper(Lists.newArrayList(new Column("test", 0)));
expectIllegaArgs(null, mapper);
expectIllegaArgs(provider, null);

try {
JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper);
bolt.withInsertQuery("test");
bolt.withTableName("test");
Assert.fail("Should have thrown IllegalArgumentException.");
} catch(IllegalArgumentException ne) {
//expected
}

try {
JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper);
bolt.withTableName("test");
bolt.withInsertQuery("test");
Assert.fail("Should have thrown IllegalArgumentException.");
} catch(IllegalArgumentException ne) {
//expected
}
}

private void expectIllegaArgs(ConnectionProvider provider, JdbcMapper mapper) {
try {
JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper);
Assert.fail("Should have thrown IllegalArgumentException.");
} catch(IllegalArgumentException ne) {
//expected
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.jdbc.bolt;

import backtype.storm.tuple.Fields;
import com.google.common.collect.Lists;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;

/**
* Created by pbrahmbhatt on 10/29/15.
*/
public class JdbcLookupBoltTest {

@Test
public void testValidation() {
ConnectionProvider provider = new HikariCPConnectionProvider(new HashMap<String, Object>());
JdbcLookupMapper mapper = new SimpleJdbcLookupMapper(new Fields("test"), Lists.newArrayList(new Column("test", 0)));
String selectQuery = "select * from dual";
expectIllegaArgs(null, selectQuery, mapper);
expectIllegaArgs(provider, null, mapper);
expectIllegaArgs(provider, selectQuery, null);
}

private void expectIllegaArgs(ConnectionProvider provider, String selectQuery, JdbcLookupMapper mapper) {
try {
JdbcLookupBolt bolt = new JdbcLookupBolt(provider, selectQuery, mapper);
Assert.fail("Should have thrown IllegalArgumentException.");
} catch(IllegalArgumentException ne) {
//expected
}
}

}

0 comments on commit c367fdb

Please sign in to comment.