Skip to content

Commit

Permalink
Merge pull request #69 from jhc-systems/graphic
Browse files Browse the repository at this point in the history
add graphic data type support, tidy up ccsid mapping
  • Loading branch information
msillence authored Dec 11, 2023
2 parents 40a262f + 5d89c1c commit 2698ed8
Show file tree
Hide file tree
Showing 20 changed files with 863 additions and 718 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,41 @@

public class As400ChangeEventSourceFactory implements ChangeEventSourceFactory<As400Partition, As400OffsetContext> {

private final As400ConnectorConfig configuration;
private final As400ConnectorConfig snapshotConfig;
private final As400RpcConnection rpcConnection;
private final MainConnectionProvidingConnectionFactory<As400JdbcConnection> jdbcConnectionFactory;
private final ErrorHandler errorHandler;
private final EventDispatcher<As400Partition, TableId> dispatcher;
private final Clock clock;
private final As400DatabaseSchema schema;
private final As400ConnectorConfig configuration;
private final As400ConnectorConfig snapshotConfig;
private final As400RpcConnection rpcConnection;
private final MainConnectionProvidingConnectionFactory<As400JdbcConnection> jdbcConnectionFactory;
private final ErrorHandler errorHandler;
private final EventDispatcher<As400Partition, TableId> dispatcher;
private final Clock clock;
private final As400DatabaseSchema schema;

public As400ChangeEventSourceFactory(As400ConnectorConfig configuration, As400ConnectorConfig snapshotConfig,
As400RpcConnection rpcConnection,
MainConnectionProvidingConnectionFactory<As400JdbcConnection> jdbcConnectionFactory,
ErrorHandler errorHandler, EventDispatcher<As400Partition, TableId> dispatcher, Clock clock,
As400DatabaseSchema schema) {
this.configuration = configuration;
this.rpcConnection = rpcConnection;
this.jdbcConnectionFactory = jdbcConnectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
this.snapshotConfig = snapshotConfig;
}
public As400ChangeEventSourceFactory(As400ConnectorConfig configuration, As400ConnectorConfig snapshotConfig,
As400RpcConnection rpcConnection,
MainConnectionProvidingConnectionFactory<As400JdbcConnection> jdbcConnectionFactory,
ErrorHandler errorHandler, EventDispatcher<As400Partition, TableId> dispatcher, Clock clock,
As400DatabaseSchema schema) {
this.configuration = configuration;
this.rpcConnection = rpcConnection;
this.jdbcConnectionFactory = jdbcConnectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
this.snapshotConfig = snapshotConfig;
}

@Override
public SnapshotChangeEventSource<As400Partition, As400OffsetContext> getSnapshotChangeEventSource(
SnapshotProgressListener<As400Partition> snapshotProgressListener,
NotificationService<As400Partition, As400OffsetContext> notificationService) {
return new As400SnapshotChangeEventSource(snapshotConfig, rpcConnection, jdbcConnectionFactory, schema,
dispatcher, clock, snapshotProgressListener, notificationService);
}
@Override
public SnapshotChangeEventSource<As400Partition, As400OffsetContext> getSnapshotChangeEventSource(
SnapshotProgressListener<As400Partition> snapshotProgressListener,
NotificationService<As400Partition, As400OffsetContext> notificationService) {
return new As400SnapshotChangeEventSource(snapshotConfig, rpcConnection, jdbcConnectionFactory, schema,
dispatcher, clock, snapshotProgressListener, notificationService);
}

@Override
public StreamingChangeEventSource<As400Partition, As400OffsetContext> getStreamingChangeEventSource() {
return new As400StreamingChangeEventSource(configuration, rpcConnection, jdbcConnectionFactory.mainConnection(),
dispatcher, errorHandler, clock, schema);
}
@Override
public StreamingChangeEventSource<As400Partition, As400OffsetContext> getStreamingChangeEventSource() {
return new As400StreamingChangeEventSource(configuration, rpcConnection, jdbcConnectionFactory.mainConnection(),
dispatcher, errorHandler, clock, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class As400ConnectorConfig extends RelationalDatabaseConnectorConfig {

public static final Field TO_CCSID = Field.create("to_ccsid", "to ccsid", "when the table indicates the from_ccsid translate to this to_ccsid setting", -1);

public static final Field DIAGNOSTICS_FOLDER = Field.create("diagnostics_folder",
"folder to dump failed decodings to", "used when there is a decoding failure to aid diagnostics");

/**
* Maximum number of journal entries to process server side
*/
Expand Down Expand Up @@ -189,6 +192,10 @@ public boolean isSecure() {
return config.getBoolean(SECURE);
}

public String diagnosticsFolder() {
return config.getString(DIAGNOSTICS_FOLDER);
}

public JournalProcessedPosition getOffset() {
final String receiver = config.getString(As400OffsetContext.RECEIVER);
final String lib = config.getString(As400OffsetContext.RECEIVER_LIBRARY);
Expand Down Expand Up @@ -224,14 +231,16 @@ protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {

public static Field.Set ALL_FIELDS = Field.setOf(JdbcConfiguration.HOSTNAME, USER, PASSWORD, SCHEMA, BUFFER_SIZE,
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, KEEP_ALIVE, THREAD_USED, SOCKET_TIMEOUT,
MAX_SERVER_SIDE_ENTRIES, TOPIC_NAMING_STRATEGY, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT, SECURE);
MAX_SERVER_SIDE_ENTRIES, TOPIC_NAMING_STRATEGY, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT, SECURE,
DIAGNOSTICS_FOLDER);

public static ConfigDef configDef() {
final ConfigDef c = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("ibmi")
.type(
JdbcConfiguration.HOSTNAME, USER, PASSWORD, SCHEMA, BUFFER_SIZE,
KEEP_ALIVE, THREAD_USED, SOCKET_TIMEOUT, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT, SECURE)
KEEP_ALIVE, THREAD_USED, SOCKET_TIMEOUT, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT, SECURE,
DIAGNOSTICS_FOLDER)
.connector()
.events(
As400OffsetContext.EVENT_SEQUENCE_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,91 +26,92 @@

public class As400DatabaseSchema extends RelationalDatabaseSchema implements SchemaCacheIF {

private static final Logger log = LoggerFactory.getLogger(As400DatabaseSchema.class);
private final As400ConnectorConfig config;
private final Map<String, TableInfo> map = new HashMap<>();
private final As400JdbcConnection jdbcConnection;
private final SchemaInfoConversion schemaInfoConversion;
private final JdbcFileDecoder fileDecoder;

public As400DatabaseSchema(As400ConnectorConfig config, As400JdbcConnection jdbcConnection,
TopicNamingStrategy<TableId> topicSelector, SchemaNameAdjuster schemaNameAdjuster) {
super(config, topicSelector, config.getTableFilters().dataCollectionFilter(), config.getColumnFilter(),
new TableSchemaBuilder(new As400ValueConverters(), new As400DefaultValueConverter(), schemaNameAdjuster,
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getFieldNamer(), false),
false, config.getKeyMapper());

this.config = config;
this.jdbcConnection = jdbcConnection;
fileDecoder = new JdbcFileDecoder(jdbcConnection, jdbcConnection.getRealDatabaseName(), this,
config.getToCcsid());

schemaInfoConversion = new SchemaInfoConversion(fileDecoder);
}

public JdbcFileDecoder getFileDecoder() {
return fileDecoder;
}

public void clearCache(String systemTableName, String schema) {
fileDecoder.clearCache(systemTableName, schema);
}

public Optional<TableInfo> getRecordFormat(String systemTableName, String schema) {
final Optional<TableInfo> oti = fileDecoder.getRecordFormat(systemTableName, schema);
oti.stream().forEach(ti -> {
store(jdbcConnection.getRealDatabaseName(), schema, systemTableName, ti);
});
return oti;
}

// assume always long name - only called from snapshotting
public void addSchema(Table table) {
final TableId id = table.id();

// save for decoding
final Optional<String> systemTableNameOpt = jdbcConnection.getSystemName(id.schema(), id.table());
systemTableNameOpt.map(systemTableName -> {
final TableInfo tableInfo = schemaInfoConversion.table2TableInfo(table);
return map.put(toKey(id.catalog(), id.schema(), systemTableName), tableInfo);
});

forwardSchema(table);
}

public void forwardSchema(Table table) {
tables().overwriteTable(table);
this.buildAndRegisterSchema(table);
}

public String getSchemaName() {
return config.getSchema();
}

@Override
// implements SchemaCacheIF.store - system name tables/column names
// assume always short name - only called from the journal
public void store(String database, String schema, String tableName, TableInfo tableInfo) {
map.put(toKey(database, schema, tableName), tableInfo);

final Table table = SchemaInfoConversion.tableInfo2Table(database, schema, tableName, tableInfo);
forwardSchema(table);
}

@Override
// assume always short name - only called from the journal
public TableInfo retrieve(String database, String schema, String tableName) {
return map.get(toKey(database, schema, tableName));
}

@Override
// assume always short name - only called from the journal
public void clearCache(String database, String schema, String tableName) {
map.remove(toKey(database, schema, tableName));
}

private String toKey(String database, String schema, String tableName) {
return String.format("%s.%s.%s", database, schema, tableName);
}
private static final Logger log = LoggerFactory.getLogger(As400DatabaseSchema.class);
private final As400ConnectorConfig config;
private final Map<String, TableInfo> map = new HashMap<>();
private final As400JdbcConnection jdbcConnection;
private final SchemaInfoConversion schemaInfoConversion;
private final JdbcFileDecoder fileDecoder;

public As400DatabaseSchema(As400ConnectorConfig config, As400JdbcConnection jdbcConnection,
TopicNamingStrategy<TableId> topicSelector, SchemaNameAdjuster schemaNameAdjuster) {
super(config, topicSelector, config.getTableFilters().dataCollectionFilter(), config.getColumnFilter(),
new TableSchemaBuilder(new As400ValueConverters(), new As400DefaultValueConverter(), schemaNameAdjuster,
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getFieldNamer(), false),
false, config.getKeyMapper());

this.config = config;
this.jdbcConnection = jdbcConnection;
fileDecoder = new JdbcFileDecoder(jdbcConnection, jdbcConnection.getRealDatabaseName(), this,
config.getFromCcsid(),
config.getToCcsid());

schemaInfoConversion = new SchemaInfoConversion(fileDecoder);
}

public JdbcFileDecoder getFileDecoder() {
return fileDecoder;
}

public void clearCache(String systemTableName, String schema) {
fileDecoder.clearCache(systemTableName, schema);
}

public Optional<TableInfo> getRecordFormat(String systemTableName, String schema) {
final Optional<TableInfo> oti = fileDecoder.getRecordFormat(systemTableName, schema);
oti.stream().forEach(ti -> {
store(jdbcConnection.getRealDatabaseName(), schema, systemTableName, ti);
});
return oti;
}

// assume always long name - only called from snapshotting
public void addSchema(Table table) {
final TableId id = table.id();

// save for decoding
final Optional<String> systemTableNameOpt = jdbcConnection.getSystemName(id.schema(), id.table());
systemTableNameOpt.map(systemTableName -> {
final TableInfo tableInfo = schemaInfoConversion.table2TableInfo(table);
return map.put(toKey(id.catalog(), id.schema(), systemTableName), tableInfo);
});

forwardSchema(table);
}

public void forwardSchema(Table table) {
tables().overwriteTable(table);
this.buildAndRegisterSchema(table);
}

public String getSchemaName() {
return config.getSchema();
}

@Override
// implements SchemaCacheIF.store - system name tables/column names
// assume always short name - only called from the journal
public void store(String database, String schema, String tableName, TableInfo tableInfo) {
map.put(toKey(database, schema, tableName), tableInfo);

final Table table = SchemaInfoConversion.tableInfo2Table(database, schema, tableName, tableInfo);
forwardSchema(table);
}

@Override
// assume always short name - only called from the journal
public TableInfo retrieve(String database, String schema, String tableName) {
return map.get(toKey(database, schema, tableName));
}

@Override
// assume always short name - only called from the journal
public void clearCache(String database, String schema, String tableName) {
map.remove(toKey(database, schema, tableName));
}

private String toKey(String database, String schema, String tableName) {
return String.format("%s.%s.%s", database, schema, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public As400RpcConnection(As400ConnectorConfig config, As400StreamingChangeEvent
.withJournalInfo(journalInfo)
.withMaxServerSideEntries(config.getMaxServerSideEntries())
.withServerFiltering(true)
.withIncludeFiles(includes).build();
.withIncludeFiles(includes).withDumpFolder(config.diagnosticsFolder()).build();
retrieveJournal = new RetrieveJournal(rconfig, journalInfoRetrieval);
}
catch (final IOException e) {
Expand Down
Loading

0 comments on commit 2698ed8

Please sign in to comment.