Skip to content

Commit

Permalink
Incremental mode for CSV files
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Jan 22, 2019
1 parent ce4fe77 commit 07f250a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 81 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@
</dependency>

<!-- Oracle -->
<dependency>
<groupId>com.oracle.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.2.0.1</version>
</dependency>
<!--<dependency>-->
<!--<groupId>com.oracle.jdbc</groupId>-->
<!--<artifactId>ojdbc8</artifactId>-->
<!--<version>12.2.0.1</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>com.oracle.jdbc</groupId>-->
<!--<artifactId>orai18n</artifactId>-->
Expand Down
157 changes: 82 additions & 75 deletions src/main/java/org/replicadb/manager/CsvManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import de.siegmar.fastcsv.writer.CsvWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.replicadb.cli.ReplicationMode;
import org.replicadb.cli.ToolOptions;

import java.io.*;
Expand All @@ -20,15 +21,14 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;

public class CsvManager extends SqlManager {

private static final Logger LOG = LogManager.getLogger(CsvManager.class.getName());

// String array with the paths of the temporal files
private static String[] tempFilesPath;

/**
Expand All @@ -39,7 +39,7 @@ public class CsvManager extends SqlManager {
public CsvManager(ToolOptions opts, DataSourceType dsType) {
super(opts);
this.dsType = dsType;
// Fixed size
// Fixed size String Array
tempFilesPath = new String[options.getJobs()];
}

Expand All @@ -60,61 +60,72 @@ public String getDriverClass() {
}

@Override
public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLException, IOException, URISyntaxException {
public int insertDataToTable(ResultSet resultSet, int taskId) throws Exception {

try {
ResultSetMetaData rsmd = resultSet.getMetaData();
int columnsNumber = rsmd.getColumnCount();

ResultSetMetaData rsmd = resultSet.getMetaData();
int columnsNumber = rsmd.getColumnCount();
// Temporal file name
String randomFileUrl = options.getSinkConnect() + ".repdb." + (new Random().nextInt(1000) + 9000);
LOG.info("Temp CSV file path: " + randomFileUrl);

String randomFileUrl = options.getSinkConnect() + ".repdb." + (new Random().nextInt(1000) + 9000);
LOG.debug("CSV path: " + randomFileUrl);
tempFilesPath[taskId] = randomFileUrl;
// Save the path of temp file
tempFilesPath[taskId] = randomFileUrl;

File file = getFileFromPathString(randomFileUrl);
File file = getFileFromPathString(randomFileUrl);

CsvWriter csvWriter = new CsvWriter();
setCsvWriterOptions(csvWriter);
CsvWriter csvWriter = new CsvWriter();
// Custom user csv options
setCsvWriterOptions(csvWriter);

try (CsvAppender csvAppender = csvWriter.append(file, StandardCharsets.UTF_8)) {
// Write the CSV
try (CsvAppender csvAppender = csvWriter.append(file, StandardCharsets.UTF_8)) {

// headers, only in the first temporal file.
if (taskId == 0 && Boolean.valueOf(options.getSinkConnectionParams().getProperty("Header"))) {
for (int i = 1; i <= columnsNumber; i++) {
csvAppender.appendField(rsmd.getColumnName(i));
}
csvAppender.endLine();
// headers, only in the first temporal file.
if (taskId == 0 && Boolean.valueOf(options.getSinkConnectionParams().getProperty("Header"))) {
for (int i = 1; i <= columnsNumber; i++) {
csvAppender.appendField(rsmd.getColumnName(i));
}
csvAppender.endLine();
}

String colValue;
String[] colValues; //= new String[columnsNumber];
String colValue;
String[] colValues;

// lines
while (resultSet.next()) {
colValues = new String[columnsNumber];
// lines
while (resultSet.next()) {
colValues = new String[columnsNumber];

// Iterate over the columns of the row
for (int i = 1; i <= columnsNumber; i++) {
colValue = resultSet.getString(i);
// Iterate over the columns of the row
for (int i = 1; i <= columnsNumber; i++) {
colValue = resultSet.getString(i);

if (!this.options.isSinkDisableEscape() && !resultSet.wasNull())
colValues[i - 1] = colValue.replace("\n", "\\n").replace("\r", "\\r");
else
colValues[i - 1] = colValue;
}
csvAppender.appendLine(colValues);
if (!this.options.isSinkDisableEscape() && !resultSet.wasNull())
colValues[i - 1] = colValue.replace("\n", "\\n").replace("\r", "\\r");
else
colValues[i - 1] = colValue;
}
csvAppender.appendLine(colValues);
}

} catch (Exception e) {
throw e;
}

return 0;
}

/**
* Retrieves and sets the custom options that define the CSV Writer
*
* @param writer
*/
private void setCsvWriterOptions(CsvWriter writer) {
Properties fileProperties = options.getSinkConnectionParams();

// Header option is not supported on incremental mode
if (Boolean.valueOf(options.getSinkConnectionParams().getProperty("Header"))
&& options.getMode().equals(ReplicationMode.INCREMENTAL.getModeText())) {
throw new IllegalArgumentException("Header option is not supported on incremental mode");
}

String fieldSeparator, textDelimiter, lineDelimiter;

fieldSeparator = fileProperties.getProperty("FieldSeparator");
Expand Down Expand Up @@ -147,48 +158,42 @@ private void setCsvWriterOptions(CsvWriter writer) {


@Override
protected void createStagingTable() throws SQLException {

protected void createStagingTable() {
}

@Override
protected void mergeStagingTable() {

try {

//
// URL finalFile = new URL(options.getSinkConnect());
// LOG.debug("Final File PATH: " + finalFile.getPath());
// LOG.debug("Final File FILE: " + finalFile.getFile());
//
//
//
// LOG.debug("Final File URI: " + finalFile.toURI());

/**
* Merging temporal files
*/
protected void mergeStagingTable() throws Exception {

File finalFile = getFileFromPathString(options.getSinkConnect());
File firstTemporalFile = getFileFromPathString(tempFilesPath[0]);
Path firstTemporalFilePath = Paths.get(firstTemporalFile.getPath());
File finalFile = getFileFromPathString(options.getSinkConnect());
File firstTemporalFile = getFileFromPathString(tempFilesPath[0]);
Path firstTemporalFilePath = Paths.get(firstTemporalFile.getPath());

int tempFilesIdx = 0;
if (!options.getMode().equals(ReplicationMode.INCREMENTAL.getModeText())) {
// Rename first temporal file to the final file
Files.move(firstTemporalFilePath, firstTemporalFilePath.resolveSibling(finalFile.getPath()), StandardCopyOption.REPLACE_EXISTING);
tempFilesIdx = 1;
LOG.info("Complete mode: creating and merging all temp files into: " + finalFile.getPath());
} else {
LOG.info("Incremental mode: appending and merging all temp files into: " + finalFile.getPath());
}

// Append the rest temporal files into final file
try (FileChannel finalFileChannel = new FileOutputStream(finalFile, true).getChannel()) {

// Channel for append to the final file
FileChannel finalFileChannel = new FileOutputStream(finalFile, true).getChannel();

for (int i = 1; i <= tempFilesPath.length - 1; i++) {
// Starts with 1 because the first temp file was renamed.
for (int i = tempFilesIdx; i <= tempFilesPath.length - 1; i++) {
// Temp file channel
FileChannel tempFileChannel = new FileInputStream(getFileFromPathString(tempFilesPath[i])).getChannel();
// Append temp file to final file
finalFileChannel.transferFrom(tempFileChannel, finalFileChannel.size(), tempFileChannel.size());
tempFileChannel.close();
boolean isDeleted = getFileFromPathString(tempFilesPath[i]).delete();
LOG.debug("isDeleted:"+isDeleted);
// Delete temp file
getFileFromPathString(tempFilesPath[i]).delete();
}

finalFileChannel.close();

} catch (IOException | URISyntaxException e) {
e.printStackTrace();
}

}
Expand All @@ -201,26 +206,30 @@ public void postSourceTasks() {/*Not implemented*/}


@Override
public void postSinkTasks() throws IOException {
LOG.debug(Arrays.toString(tempFilesPath));
public void postSinkTasks() throws Exception {
// Always merge data
this.mergeStagingTable();
}

@Override
public void cleanUp() throws Exception {
// Ensure drop temporal file
for (int i = 0; i <= tempFilesPath.length - 1; i++){
boolean isDeleted = getFileFromPathString(tempFilesPath[i]).delete();
LOG.debug("isDeleted:"+isDeleted);
}
for (int i = 0; i <= tempFilesPath.length - 1; i++) getFileFromPathString(tempFilesPath[i]).delete();
}


/**
* Returns an instance of a File given the url string of the csv path.
* It gives compatibility with the windows, linux and mac URL Strings.
*
* @param urlString
* @return
* @throws MalformedURLException
* @throws URISyntaxException
*/
private File getFileFromPathString(String urlString) throws MalformedURLException, URISyntaxException {

URL url = new URL(urlString);

URI uri = url.toURI();

if (uri.getAuthority() != null && uri.getAuthority().length() > 0) {
Expand All @@ -229,8 +238,6 @@ private File getFileFromPathString(String urlString) throws MalformedURLExceptio
}

File file = new File(uri);
LOG.debug("File is: " + file.toString());

return file;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/replicadb/manager/JdbcDrivers.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public enum JdbcDrivers {
JTDS_SQLSERVER("net.sourceforge.jtds.jdbc.Driver", "jdbc:jtds:sqlserver:"),
DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2:"), NETEZZA("org.netezza.Driver", "jdbc:netezza:"),
CUBRID("cubrid.jdbc.driver.CUBRIDDriver", "jdbc:cubrid:"), DENODO("com.denodo.vdp.jdbc.Driver", "jdbc:vdb:"),
CSV("", "file:");
CSV(null, "file:");

private final String driverClass;
private final String schemePrefix;
Expand Down

0 comments on commit 07f250a

Please sign in to comment.