diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index 15a23452969..0c0cca65767 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -21,6 +21,7 @@ import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; +import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.slf4j.Logger; @@ -52,6 +53,7 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector co } public AbstractJdbcBolt(ConnectionProvider connectionProvider) { + Validate.notNull(connectionProvider); this.connectionProvider = connectionProvider; } diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java index 2f29000ee6e..c3328f1c57b 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java @@ -20,6 +20,7 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import org.apache.commons.lang.Validate; import org.apache.commons.lang3.StringUtils; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; @@ -45,15 +46,23 @@ public class JdbcInsertBolt extends AbstractJdbcBolt { public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) { super(connectionProvider); + + Validate.notNull(jdbcMapper); this.jdbcMapper = jdbcMapper; } public JdbcInsertBolt withTableName(String tableName) { + if (insertQuery != null) { + throw new IllegalArgumentException("You can not specify both insertQuery and tableName."); + } this.tableName = tableName; return this; } public JdbcInsertBolt withInsertQuery(String insertQuery) { + if (this.tableName != null) { + throw new IllegalArgumentException("You can not specify both insertQuery and tableName."); + } this.insertQuery = insertQuery; return this; } diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java index 25122e28f0c..b1dadb739af 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java @@ -20,6 +20,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; @@ -40,6 +41,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt { public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) { super(connectionProvider); + + Validate.notNull(selectQuery); + Validate.notNull(jdbcLookupMapper); + this.selectQuery = selectQuery; this.jdbcLookupMapper = jdbcLookupMapper; } diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java index 5a225522b96..b267bd16d1a 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java @@ -22,6 +22,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Values; +import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.Column; import java.util.ArrayList; @@ -33,6 +34,8 @@ public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLook public SimpleJdbcLookupMapper(Fields outputFields, List queryColumns) { super(queryColumns); + + Validate.notEmpty(outputFields.toList()); this.outputFields = outputFields; } diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java index c4005e3c2b2..9befb1eddf5 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -18,6 +18,7 @@ package org.apache.storm.jdbc.mapper; import backtype.storm.tuple.ITuple; +import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; @@ -35,6 +36,9 @@ public class SimpleJdbcMapper implements JdbcMapper { private List schemaColumns; public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) { + Validate.notEmpty(tableName); + Validate.notNull(connectionProvider); + int queryTimeoutSecs = 30; connectionProvider.prepare(); JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs); @@ -42,6 +46,7 @@ public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) } public SimpleJdbcMapper(List schemaColumns) { + Validate.notEmpty(schemaColumns); this.schemaColumns = schemaColumns; } diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java new file mode 100644 index 00000000000..1b393e91f15 --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.bolt; + +import com.google.common.collect.Lists; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; +import org.apache.storm.jdbc.common.HikariCPConnectionProvider; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * Created by pbrahmbhatt on 10/29/15. + */ +public class JdbcInsertBoltTest { + + @Test + public void testValidation() { + ConnectionProvider provider = new HikariCPConnectionProvider(new HashMap()); + JdbcMapper mapper = new SimpleJdbcMapper(Lists.newArrayList(new Column("test", 0))); + expectIllegaArgs(null, mapper); + expectIllegaArgs(provider, null); + + try { + JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper); + bolt.withInsertQuery("test"); + bolt.withTableName("test"); + Assert.fail("Should have thrown IllegalArgumentException."); + } catch(IllegalArgumentException ne) { + //expected + } + + try { + JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper); + bolt.withTableName("test"); + bolt.withInsertQuery("test"); + Assert.fail("Should have thrown IllegalArgumentException."); + } catch(IllegalArgumentException ne) { + //expected + } + } + + private void expectIllegaArgs(ConnectionProvider provider, JdbcMapper mapper) { + try { + JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper); + Assert.fail("Should have thrown IllegalArgumentException."); + } catch(IllegalArgumentException ne) { + //expected + } + } + +} diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java new file mode 100644 index 00000000000..1fda3b1c3f8 --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.bolt; + +import backtype.storm.tuple.Fields; +import com.google.common.collect.Lists; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; +import org.apache.storm.jdbc.common.HikariCPConnectionProvider; +import org.apache.storm.jdbc.mapper.JdbcLookupMapper; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; + +/** + * Created by pbrahmbhatt on 10/29/15. + */ +public class JdbcLookupBoltTest { + + @Test + public void testValidation() { + ConnectionProvider provider = new HikariCPConnectionProvider(new HashMap()); + JdbcLookupMapper mapper = new SimpleJdbcLookupMapper(new Fields("test"), Lists.newArrayList(new Column("test", 0))); + String selectQuery = "select * from dual"; + expectIllegaArgs(null, selectQuery, mapper); + expectIllegaArgs(provider, null, mapper); + expectIllegaArgs(provider, selectQuery, null); + } + + private void expectIllegaArgs(ConnectionProvider provider, String selectQuery, JdbcLookupMapper mapper) { + try { + JdbcLookupBolt bolt = new JdbcLookupBolt(provider, selectQuery, mapper); + Assert.fail("Should have thrown IllegalArgumentException."); + } catch(IllegalArgumentException ne) { + //expected + } + } + +}