Skip to content

Commit

Permalink
Merge pull request #151 from zendesk/igor/include_mysql_db
Browse files Browse the repository at this point in the history
Maxwell should capture `mysql` database schema
  • Loading branch information
Ben Osheroff committed Nov 20, 2015
2 parents ae4b407 + ba0e707 commit f1b6302
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 16 deletions.
4 changes: 4 additions & 0 deletions src/main/java/com/zendesk/maxwell/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public Database findDatabase(String string) {
return null;
}

public void addDatabase(Database d) {
this.databases.add(d);
}

public Schema copy() {
ArrayList<Database> newDBs = new ArrayList<>();
for ( Database d : this.databases ) {
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -20,23 +21,20 @@ public class SchemaCapturer {
private final Connection connection;
static final Logger LOGGER = LoggerFactory.getLogger(SchemaStore.class);

private final String[] alwaysExclude = {"performance_schema", "information_schema", "mysql"};
private final HashSet<String> excludeDatabases;
public static final HashSet<String> IGNORED_DATABASES = new HashSet<String>(
Arrays.asList(new String[] {"performance_schema", "information_schema"})
);

private final HashSet<String> includeDatabases;

private final PreparedStatement infoSchemaStmt;
private final String INFORMATION_SCHEMA_SQL =
"SELECT * FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = ? AND `TABLE_NAME` = ?";

public SchemaCapturer(Connection c) throws SQLException {
this.excludeDatabases = new HashSet<String>();
this.includeDatabases = new HashSet<String>();
this.connection = c;
this.infoSchemaStmt = connection.prepareStatement(INFORMATION_SCHEMA_SQL);

for (String s : alwaysExclude) {
this.excludeDatabases.add(s);
}
}

public SchemaCapturer(Connection c, String dbName) throws SQLException {
Expand All @@ -58,7 +56,7 @@ public Schema capture() throws SQLException, SchemaSyncError {
if ( includeDatabases.size() > 0 && !includeDatabases.contains(dbName))
continue;

if ( excludeDatabases.contains(dbName) )
if ( IGNORED_DATABASES.contains(dbName) )
continue;

databases.add(captureDatabase(dbName, encoding));
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/zendesk/maxwell/schema/SchemaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ private void restoreFrom(BinlogPosition targetPosition)

this.schema_id = schemaRS.getLong("id");

if ( this.schema.findDatabase("mysql") == null ) {
LOGGER.info("Could not find mysql db, adding it to schema");
SchemaCapturer sc = new SchemaCapturer(connection, "mysql");
Database db = sc.capture().findDatabase("mysql");
this.schema.addDatabase(db);
shouldResave = true;
}

p = connection.prepareStatement("SELECT * from `maxwell`.`databases` where schema_id = ? ORDER by id");
p.setLong(1, this.schema_id);

Expand Down Expand Up @@ -277,7 +285,7 @@ private void restoreTable(Database d, String name, int id, String encoding, Stri
while (cRS.next()) {
String[] enumValues = null;
if ( cRS.getString("enum_values") != null )
enumValues = StringUtils.split(cRS.getString("enum_values"), ",");
enumValues = StringUtils.splitByWholeSeparatorPreserveAllTokens(cRS.getString("enum_values"), ",");

ColumnDef c = ColumnDef.build(t.getName(),
cRS.getString("name"), cRS.getString("encoding"),
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/zendesk/maxwell/SchemaCaptureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testDatabases() throws SQLException, SchemaSyncError {
Schema s = capturer.capture();
String dbs = StringUtils.join(s.getDatabaseNames().iterator(), ":");

assertEquals("maxwell:shard_1:shard_2:test", dbs);
assertEquals("maxwell:mysql:shard_1:shard_2:test", dbs);
}

@Test
Expand Down
19 changes: 13 additions & 6 deletions src/test/java/com/zendesk/maxwell/SchemaStoreTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.zendesk.maxwell;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;

import java.io.IOException;
Expand All @@ -14,6 +12,7 @@
import org.junit.Before;
import org.junit.Test;

import com.zendesk.maxwell.schema.Database;
import com.zendesk.maxwell.schema.Schema;
import com.zendesk.maxwell.schema.SchemaCapturer;
import com.zendesk.maxwell.schema.SchemaStore;
Expand Down Expand Up @@ -74,9 +73,8 @@ public void testUpgradeToFixServerIDBug() throws Exception {

SchemaStore restoredSchema = SchemaStore.restore(server.getConnection(), server.SERVER_ID, binlogPosition);

assertThat(restoredSchema.getSchema().equals(this.schemaStore.getSchema()), is(true));
assertThat(restoredSchema.getSchemaID(), is(badSchemaID + 1));

List<String> diffs = restoredSchema.getSchema().diff(this.schemaStore.getSchema(), "restored", "captured");
assert diffs.isEmpty() : "Expected empty schema diff, got" + diffs;
}

@Test
Expand All @@ -96,4 +94,13 @@ public void testMasterChange() throws Exception {
rs = server.getConnection().createStatement().executeQuery("SELECT * from `maxwell`.`positions`");
assertThat(rs.next(), is(false));
}

@Test
public void testRestoreMysqlDb() throws Exception {
Database db = this.schema.findDatabase("mysql");
this.schema.getDatabases().remove(db);
this.schemaStore.save();
SchemaStore restoredSchema = SchemaStore.restore(server.getConnection(), server.SERVER_ID, this.binlogPosition);
assertThat(restoredSchema.getSchema().findDatabase("mysql"), is(not(nullValue())));
}
}

0 comments on commit f1b6302

Please sign in to comment.