Skip to content

Commit

Permalink
Running system
Browse files Browse the repository at this point in the history
  • Loading branch information
amarflybot committed Apr 22, 2017
1 parent 784a089 commit e0c52d0
Show file tree
Hide file tree
Showing 18 changed files with 168 additions and 154 deletions.
19 changes: 14 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jersey</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
Expand Down Expand Up @@ -71,20 +71,29 @@
<systemPath>${project.basedir}/src/lib/hive_service.jar</systemPath>
</dependency>


<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>ImpalaJDBC4</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/lib/ImpalaJDBC4.jar</systemPath>
<systemPath>${project.basedir}/src/lib/ImpalaJDBC41.jar</systemPath>
</dependency>

<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>ImpalaJDBC4</artifactId>
<artifactId>httpclient</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/lib/httpclient-4.1.3.jar</systemPath>
</dependency>

<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>httpcore</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/lib/ImpalaJDBC4.jar</systemPath>
<systemPath>${project.basedir}/src/lib/httpcore-4.1.3.jar</systemPath>
</dependency>

<dependency>
Expand Down
Binary file added src/lib/ImpalaJDBC41.jar
Binary file not shown.
Empty file modified src/lib/hive_metastore.jar
100644 → 100755
Empty file.
Empty file modified src/lib/hive_service.jar
100644 → 100755
Empty file.
Binary file added src/lib/httpclient-4.1.3.jar
Binary file not shown.
Binary file added src/lib/httpcore-4.1.3.jar
Binary file not shown.
Empty file modified src/lib/libfb303-0.9.0.jar
100644 → 100755
Empty file.
Empty file modified src/lib/libthrift-0.9.0.jar
100644 → 100755
Empty file.
Empty file modified src/lib/ql.jar
100644 → 100755
Empty file.
17 changes: 6 additions & 11 deletions src/main/java/com/example/AppConfig.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package com.example;

import com.cloudera.impala.jdbc.common.AbstractDataSource;
import com.cloudera.impala.jdbc4.ImpalaJDBC4DataSource;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;

/**
* Created by amarendra on 21/4/17.
*/
@Configuration
public class AppConfig {

@Bean
public AbstractDataSource dataSource() {
AbstractDataSource dataSource = new ImpalaJDBC4DataSource();
dataSource.setURL("jdbc:impala://quickstart.cloudera:21050/default");
return dataSource;
}

@Bean
JdbcTemplate jdbcTemplate(final AbstractDataSource abstractDataSource){
return new JdbcStream(abstractDataSource);
JdbcTemplate jdbcTemplate(final DataSource dataSource){
return new JdbcStream(dataSource);
}
}
33 changes: 19 additions & 14 deletions src/main/java/com/example/JdbcStream.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package com.example;


import com.cloudera.impala.jdbc.common.AbstractDataSource;



import com.cloudera.jdbc.jdbc41.S41ForwardResultSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jdbc.support.rowset.ResultSetWrappingSqlRowSet;
import org.springframework.jdbc.support.rowset.SqlRowSet;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.*;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterator;
Expand All @@ -30,7 +31,7 @@
public class JdbcStream extends JdbcTemplate {

@Autowired
public JdbcStream(AbstractDataSource dataSource) {
public JdbcStream(DataSource dataSource) {
super(dataSource);
}

Expand All @@ -47,12 +48,8 @@ public boolean hasNext() {

@Override
public SqlRow next() {
try {
if (resultSet.isLast()) {
throw new NoSuchElementException();
}
} catch (SQLException e) {
throw new RuntimeException("Error handling the resultSet");
if (!rowSet.next()) {
throw new NoSuchElementException();
}
return sqlRow;
}
Expand Down Expand Up @@ -86,12 +83,20 @@ public Stream<SqlRow> stream() throws SQLException {
Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {
@Override
public boolean hasNext() {
return !rowSet.isLast();
return rowSet.next();
}

@Override
public SqlRow next() {
if (rowSet.isLast()) {
ResultSetWrappingSqlRowSet resultSetWrappingSqlRowSet = (ResultSetWrappingSqlRowSet) rowSet;
S41ForwardResultSet resultSet = (S41ForwardResultSet) resultSetWrappingSqlRowSet.getResultSet();
boolean closed = false;
try {
closed = resultSet.isClosed();
} catch (SQLException e) {
e.printStackTrace();
}
if (closed) {
throw new NoSuchElementException();
}
return sqlRow;
Expand Down
31 changes: 23 additions & 8 deletions src/main/java/com/example/SpringBootImpalaApplication.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.example;

import com.cloudera.impala.jdbc.common.AbstractDataSource;
import com.cloudera.dsi.dataengine.utilities.TimestampTz;
import com.cloudera.jdbc.common.AbstractDataSource;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.time.LocalDateTime;

@SpringBootApplication
public class SpringBootImpalaApplication {
Expand All @@ -17,16 +20,27 @@ public static void main(String[] args) {
SpringApplication.run(SpringBootImpalaApplication.class, args);
}

@Bean
public CommandLineRunner commandLineRunner(final AbstractDataSource abstractDataSource) {
//@Bean
public CommandLineRunner commandLineRunner(final DataSource abstractDataSource) {
return new CommandLineRunner() {
@Override
public void run(String... strings) throws Exception {
Connection connection = abstractDataSource.getConnection();
Statement statement = connection.createStatement();
PreparedStatement statement = connection.prepareStatement("INSERT INTO WEB_STAT VALUES (?,?,?,?,?,?,?)");
statement.setFetchSize(5);
//statement.executeUpdate("INSERT INTO TABLE WEB_STAT VALUES (cast('NA' as char(2)), 'testDomain', current_timestamp(), 'log', 1, 2, 3)");
ResultSet resultSet = statement.executeQuery("SELECT * FROM WEB_STAT");
statement.setString(1,"NA");
statement.setString(2,"domTest");
statement.setTimestamp(3, TimestampTz.valueOf(LocalDateTime.now()));
statement.setString(4,"testFeature");
statement.setInt(5, 2);
statement.setInt(6, 6);
statement.setInt(7, 7);
statement.execute();
statement.close();
connection.close();
connection = abstractDataSource.getConnection();
Statement statement1 = connection.createStatement();
ResultSet resultSet = statement1.executeQuery("SELECT * FROM WEB_STAT");
long currentTimeMillis = System.currentTimeMillis();
while (resultSet.next()){
System.out.println(resultSet.getInt(1));
Expand All @@ -35,7 +49,8 @@ public void run(String... strings) throws Exception {
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("Total time taken: "+(currentTimeMillis1-currentTimeMillis));
System.out.println("Done");
statement.close();
connection.commit();
statement1.close();
connection.close();
}
};
Expand Down
71 changes: 21 additions & 50 deletions src/main/java/com/example/WebStatDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import org.springframework.stereotype.Repository;
import org.springframework.util.Assert;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
import java.io.*;
import java.io.PrintWriter;
import java.sql.SQLException;

/**
Expand All @@ -19,62 +17,35 @@ public class WebStatDao {
@Autowired
JdbcStream jdbcStream;

public StreamingOutput getStreamingOutputForSql(String sql, Object[] args) {
JdbcStream.StreamableQuery streamableQuery = null;
try {
streamableQuery = jdbcStream.streamableQuery(sql, args);
} catch (SQLException e) {
e.printStackTrace();
}
StreamingOutput streamingOutput = getStreamingOutput(streamableQuery);
streamableQuery.close();
return streamingOutput;
}

private StreamingOutput getStreamingOutput(final JdbcStream.StreamableQuery streamableQuery) {

public void getStreamingOutputForSql(String sql, Object[] args, PrintWriter writer) throws SQLException {
JdbcStream.StreamableQuery streamableQuery = jdbcStream.streamableQuery(sql, args);
Gson gson = new Gson();

return new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException, WebApplicationException {
Writer writer = new BufferedWriter(new OutputStreamWriter(os));
writer.write("[");
final boolean[] first = {false};
try {
streamableQuery
.stream()
.map(sqlRow -> {
try {
WebStat webStat = WebStatMapper.mapWebStat(sqlRow);
return webStat;
} catch (RuntimeException e) {
throw new RuntimeException("Cannot convert SqlRom to WebStat");
}
}).reduce((webStat1, webStat2) -> {
Assert.notNull(webStat1, "Webstat cannot be null");
writer.write("[");
final boolean[] first = {false};
try {
streamableQuery
.stream()
.forEach(sqlRow -> {
try {
WebStat webStat = WebStatMapper.mapWebStat(sqlRow);
if (first[0]) {
writer.write(",");
} else if (!first[0]){
writer.write(gson.toJson(webStat1));
writer.write(",");
}
first[0] = true;
writer.write(gson.toJson(webStat2));
writer.write(gson.toJson(webStat));
writer.flush();
//TimeUnit.MILLISECONDS.sleep(500);
} catch (IOException e) {
throw new RuntimeException("Cannot write to Stream back");
//return webStat;
} catch (RuntimeException e) {
throw new RuntimeException("Cannot convert SqlRom to WebStat");
}
return webStat1;
});
} catch (SQLException e) {
e.printStackTrace();
}
writer.write("]");
writer.flush();
}
};
} catch (SQLException e) {
e.printStackTrace();
}
writer.write("]");
writer.flush();
writer.close();
streamableQuery.close();
}
}
14 changes: 7 additions & 7 deletions src/main/java/com/example/WebStatMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ public class WebStatMapper {
public static WebStat mapWebStat(JdbcStream.SqlRow sqlRow){

WebStat webStat = new WebStat();
String host = sqlRow.getString("HOST");
String domain = sqlRow.getString("DOMAIN");
String feature = sqlRow.getString("FEATURE");
Timestamp date = sqlRow.getTimestamp("DATE");
String core = sqlRow.getString("CORE");
String db = sqlRow.getString("DB");
String activeVisitor = sqlRow.getString("ACTIVE_VISITOR");
String host = sqlRow.getString("host");
String domain = sqlRow.getString("dom");
String feature = sqlRow.getString("feat");
Timestamp date = sqlRow.getTimestamp("timestm");
String core = sqlRow.getString("core");
String db = sqlRow.getString("db");
String activeVisitor = sqlRow.getString("active_visitor");

if (host != null){
webStat.setHost(host);
Expand Down
52 changes: 0 additions & 52 deletions src/main/java/com/example/WebStatResource.java

This file was deleted.

Loading

0 comments on commit e0c52d0

Please sign in to comment.