Skip to content

Commit

Permalink
Merge pull request #50 from zendesk/clean_old_schemas
Browse files Browse the repository at this point in the history
Clean old schemas
  • Loading branch information
Ben Osheroff committed May 1, 2015
2 parents 743dd26 + 94bf25a commit 246b702
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
4 changes: 2 additions & 2 deletions docs/docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'
You'll need a version 7 of a JVM.

```
curl -sLo - https://github.com/zendesk/maxwell/releases/download/v0.6.3/maxwell-0.6.3.tar.gz \
curl -sLo - https://github.com/zendesk/maxwell/releases/download/v0.7.0/maxwell-0.7.0.tar.gz \
| tar zxvf -
cd maxwell-0.6.3
cd maxwell-0.7.0
```


Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.zendesk</groupId>
<artifactId>maxwell</artifactId>
<version>0.6.3</version>
<version>0.7.0</version>
<packaging>jar</packaging>

<name>maxwell</name>
Expand Down
41 changes: 34 additions & 7 deletions src/main/java/com/zendesk/maxwell/schema/SchemaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.zendesk.maxwell.schema.ddl.SchemaSyncError;

public class SchemaStore {
private final int SCHEMAS_TO_KEEP = 5;
private final Connection connection;
private Schema schema;
private BinlogPosition position;
Expand Down Expand Up @@ -55,7 +56,12 @@ public SchemaStore(Connection connection, Schema schema, BinlogPosition position
this.position = position;
}

private static Integer executeInsert(PreparedStatement preparedStatement,
public SchemaStore(Connection connection, Long schema_id) throws SQLException {
this(connection);
this.schema_id = schema_id;
}

private static Long executeInsert(PreparedStatement preparedStatement,
Object... values) throws SQLException {
for (int i = 0; i < values.length; i++) {
preparedStatement.setObject(i + 1, values[i]);
Expand All @@ -65,7 +71,7 @@ private static Integer executeInsert(PreparedStatement preparedStatement,
ResultSet rs = preparedStatement.getGeneratedKeys();

if (rs.next()) {
return rs.getInt(1);
return rs.getLong(1);
} else
return null;
}
Expand All @@ -76,25 +82,29 @@ public void save() throws SQLException {

try {
connection.setAutoCommit(false);
saveSchema();
this.schema_id = saveSchema();
connection.commit();
} finally {
connection.setAutoCommit(true);
}

if ( this.schema_id != null ) {
deleteOldSchemas(schema_id);
}
}


public void saveSchema() throws SQLException {
Integer schemaId = executeInsert(schemaInsert, position.getFile(),
public Long saveSchema() throws SQLException {
Long schemaId = executeInsert(schemaInsert, position.getFile(),
position.getOffset(), 1, schema.getEncoding());

ArrayList<Object> columnData = new ArrayList<Object>();

for (Database d : schema.getDatabases()) {
Integer dbId = executeInsert(databaseInsert, schemaId, d.getName(), d.getEncoding());
Long dbId = executeInsert(databaseInsert, schemaId, d.getName(), d.getEncoding());

for (Table t : d.getTableList()) {
Integer tableId = executeInsert(tableInsert, schemaId, dbId, t.getName(), t.getEncoding(), t.getPKString());
Long tableId = executeInsert(tableInsert, schemaId, dbId, t.getName(), t.getEncoding(), t.getPKString());


for (ColumnDef c : t.getColumnList()) {
Expand All @@ -121,6 +131,8 @@ public void saveSchema() throws SQLException {
}
if ( columnData.size() > 0 )
executeColumnInsert(columnData);

return schemaId;
}

private void executeColumnInsert(ArrayList<Object> columnData) throws SQLException {
Expand Down Expand Up @@ -297,9 +309,24 @@ public void destroy() throws SQLException {
}
}

public boolean schemaExists(long schema_id) throws SQLException {
if ( this.schema_id == null )
return false;

ResultSet rs = connection.createStatement().executeQuery("select id from maxwell.schemas where id = " + schema_id);
return rs.next();
}

public BinlogPosition getBinlogPosition() {
return this.position;
}

private void deleteOldSchemas(Long currentSchemaId) throws SQLException {
Long toDelete = currentSchemaId - SCHEMAS_TO_KEEP;
while ( toDelete > 0 && schemaExists(toDelete) ) {
new SchemaStore(connection, toDelete).destroy();
toDelete--;
}
}

}

0 comments on commit 246b702

Please sign in to comment.