From 81a70301468ec970e4ad5b1e15058dc29075fa98 Mon Sep 17 00:00:00 2001 From: Tharanga Gamaethige Date: Thu, 15 Oct 2020 11:32:03 -0700 Subject: [PATCH 1/4] WIP version of the Cassandra change stream emitter. --- build.gradle | 1 + .../sidecar/common/testing/CassandraPod.java | 3 +- src/main/dist/conf/sidecar.yaml | 3 + .../apache/cassandra/sidecar/CQLSession.java | 115 +++++++ .../sidecar/CassandraSidecarDaemon.java | 7 +- .../cassandra/sidecar/Configuration.java | 28 +- .../apache/cassandra/sidecar/MainModule.java | 13 + .../cassandra/sidecar/cdc/CDCBookmark.java | 213 ++++++++++++ .../sidecar/cdc/CDCIndexWatcher.java | 94 +++++ .../sidecar/cdc/CDCRawDirectoryMonitor.java | 90 +++++ .../sidecar/cdc/CDCReaderService.java | 157 +++++++++ .../sidecar/cdc/CassandraConfig.java | 68 ++++ .../apache/cassandra/sidecar/cdc/Change.java | 118 +++++++ .../sidecar/cdc/CommitLogReader.java | 321 ++++++++++++++++++ .../sidecar/cdc/MutationHandler.java | 121 +++++++ .../cassandra/sidecar/cdc/PayloadType.java | 23 ++ .../sidecar/cdc/output/ConsoleOutput.java | 52 +++ .../cassandra/sidecar/cdc/output/Output.java | 12 + .../sidecar/metrics/cdc/CDCReaderMonitor.java | 11 + .../metrics/cdc/CDCReaderMonitorLogger.java | 39 +++ 20 files changed, 1484 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/apache/cassandra/sidecar/CQLSession.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/Change.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java create mode 100644 src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java diff --git a/build.gradle b/build.gradle index 8bfc5903f..63d5030ac 100644 --- a/build.gradle +++ b/build.gradle @@ -179,6 +179,7 @@ dependencies { compile project(":common") compile project(":cassandra40") + compile 'org.apache.cassandra:cassandra-all:4.0-beta2' } task copyCodeStyle(type: Copy) { diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java index 90141461e..afa658e4d 100644 --- a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java +++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java @@ -193,7 +193,8 @@ public void start() throws ApiException, InterruptedException, CassandraPodExcep } started = namespacedPod.getStatus().getContainerStatuses().get(0).getStarted(); - if (namespacedPod.getStatus().getContainerStatuses().get(0).getReady() && started) { + if (namespacedPod.getStatus().getContainerStatuses().get(0).getReady() && started) + { logger.info("Pod startup OK"); break; } diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml index f7e1ce3a5..20f6a0816 100644 --- a/src/main/dist/conf/sidecar.yaml +++ b/src/main/dist/conf/sidecar.yaml @@ -24,3 +24,6 @@ sidecar: healthcheck: - poll_freq_millis: 30000 + +cdc: + - configPath: file:////etc/cassandra/conf/cassandra.yaml \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/sidecar/CQLSession.java b/src/main/java/org/apache/cassandra/sidecar/CQLSession.java new file mode 100644 index 000000000..d199a94f2 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/CQLSession.java @@ -0,0 +1,115 @@ +package org.apache.cassandra.sidecar; + +import java.net.InetSocketAddress; +import java.util.Collections; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.NettyOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.ReconnectionPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.WhiteListPolicy; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** + * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as + * defined in the Configuration. + */ +@Singleton +public class CQLSession +{ + private static final Logger logger = LoggerFactory.getLogger(CQLSession.class); + @Nullable + private Session localSession; + private final InetSocketAddress inet; + private final WhiteListPolicy wlp; + private NettyOptions nettyOptions; + private QueryOptions queryOptions; + private ReconnectionPolicy reconnectionPolicy; + + @Inject + public CQLSession(Configuration configuration) + { + inet = InetSocketAddress.createUnresolved(configuration.getCassandraHost(), configuration.getCassandraPort()); + wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet)); + this.nettyOptions = new NettyOptions(); + this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE); + this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000, + configuration.getHealthCheckFrequencyMillis()); + } + + @VisibleForTesting + CQLSession(InetSocketAddress target, NettyOptions options) + { + inet = target; + wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet)); + this.nettyOptions = options; + this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE); + reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000); + } + + /** + * Provides a Session connected only to the local node from configuration. If null it means the the connection was + * not able to be established. The session still might throw a NoHostAvailableException if the local host goes + * offline or otherwise unavailable. + * + * @return Session + */ + @Nullable + public synchronized Session getLocalCql() + { + Cluster cluster = null; + try + { + if (localSession == null) + { + cluster = Cluster.builder() + .addContactPointsWithPorts(inet) + .withLoadBalancingPolicy(wlp) + .withQueryOptions(queryOptions) + .withReconnectionPolicy(reconnectionPolicy) + .withoutMetrics() + // tests can create a lot of these Cluster objects, to avoid creating HWTs and + // event thread pools for each we have the override + .withNettyOptions(nettyOptions) + .build(); + localSession = cluster.connect(); + } + } + catch (Exception e) + { + logger.debug("Failed to reach Cassandra", e); + if (cluster != null) + { + try + { + cluster.close(); + } + catch (Exception ex) + { + logger.debug("Failed to close cluster in cleanup", ex); + } + } + } + return localSession; + } + + public synchronized void close() + { + if (localSession != null) + { + localSession.getCluster().close(); + localSession = null; + } + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java index 63e503ff0..f06d33d72 100644 --- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java +++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java @@ -27,8 +27,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import io.vertx.core.http.HttpServer; +import org.apache.cassandra.sidecar.cdc.CDCReaderService; import org.apache.cassandra.sidecar.utils.SslUtils; - /** * Main class for initiating the Cassandra sidecar */ @@ -38,12 +38,14 @@ public class CassandraSidecarDaemon private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class); private final HttpServer server; private final Configuration config; + private final CDCReaderService cdcReaderService; @Inject - public CassandraSidecarDaemon(HttpServer server, Configuration config) + public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService) { this.server = server; this.config = config; + this.cdcReaderService = cdcReaderService; } public void start() @@ -51,6 +53,7 @@ public void start() banner(System.out); validate(); logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort()); + cdcReaderService.start(); server.listen(config.getPort(), config.getHost()); } diff --git a/src/main/java/org/apache/cassandra/sidecar/Configuration.java b/src/main/java/org/apache/cassandra/sidecar/Configuration.java index 5f289c581..dd537e160 100644 --- a/src/main/java/org/apache/cassandra/sidecar/Configuration.java +++ b/src/main/java/org/apache/cassandra/sidecar/Configuration.java @@ -55,12 +55,17 @@ public class Configuration private final boolean isSslEnabled; + /* Cassandra server conf path */ + @Nullable + private String cassandraConfigPath; + public Configuration(String cassandraHost, Integer cassandraPort, String host, Integer port, Integer healthCheckFrequencyMillis, boolean isSslEnabled, @Nullable String keyStorePath, @Nullable String keyStorePassword, @Nullable String trustStorePath, - @Nullable String trustStorePassword) + @Nullable String trustStorePassword, + @Nullable String cassandraConfigPath) { this.cassandraHost = cassandraHost; this.cassandraPort = cassandraPort; @@ -73,6 +78,8 @@ public Configuration(String cassandraHost, Integer cassandraPort, String host, I this.trustStorePath = trustStorePath; this.trustStorePassword = trustStorePassword; this.isSslEnabled = isSslEnabled; + + this.cassandraConfigPath = cassandraConfigPath; } /** @@ -179,6 +186,15 @@ public String getTruststorePassword() return trustStorePassword; } + /** + * Get path of the Cassandra configuration file + */ + @Nullable + public String getCassandraConfigPath() + { + return cassandraConfigPath; + } + /** * Configuration Builder */ @@ -194,6 +210,7 @@ public static class Builder private String trustStorePath; private String trustStorePassword; private boolean isSslEnabled; + private String cassandraConfigPath; public Builder setCassandraHost(String host) { @@ -255,10 +272,17 @@ public Builder setSslEnabled(boolean enabled) return this; } + public Builder setCassandraConfigPath(String configPath) + { + this.cassandraConfigPath = configPath; + return this; + } + public Configuration build() { return new Configuration(cassandraHost, cassandraPort, host, port, healthCheckFrequencyMillis, isSslEnabled, - keyStorePath, keyStorePassword, trustStorePath, trustStorePassword); + keyStorePath, keyStorePassword, trustStorePath, trustStorePassword, + cassandraConfigPath); } } } diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/MainModule.java index 27eefd7c9..f19e32845 100644 --- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java +++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java @@ -41,9 +41,13 @@ import io.vertx.ext.web.handler.LoggerHandler; import io.vertx.ext.web.handler.StaticHandler; import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory; +import org.apache.cassandra.sidecar.cdc.output.ConsoleOutput; +import org.apache.cassandra.sidecar.cdc.output.Output; import org.apache.cassandra.sidecar.common.CQLSession; import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.CassandraVersionProvider; +import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor; +import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitorLogger; import org.apache.cassandra.sidecar.routes.HealthService; import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource; import org.jboss.resteasy.plugins.server.vertx.VertxRegistry; @@ -126,6 +130,14 @@ public Router vertxRouter(Vertx vertx) return router; } + @Override + protected void configure() + { + // TODO: Make the output type configurable + bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class); + bind(Output.class).to(ConsoleOutput.class); + } + @Provides @Singleton public Configuration configuration() throws ConfigurationException, IOException @@ -151,6 +163,7 @@ public Configuration configuration() throws ConfigurationException, IOException .setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null)) .setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null)) .setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false)) + .setCassandraConfigPath(yamlConf.get(String.class, "cdc.configPath")) .build(); } catch (MalformedURLException e) diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java new file mode 100644 index 000000000..bd4914773 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java @@ -0,0 +1,213 @@ +package org.apache.cassandra.sidecar.cdc; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.sidecar.Configuration; + +/** + * Manages the CDC reader bookmark. This tracks the last successfully processed offset + * of a commit log. + */ +@Singleton +public class CDCBookmark extends TimerTask +{ + // Tracks last disk sync'd commit log position. + private CommitLogPosition lastSyncedPosition; + // Tracks last successfully processed commit log position by the CDC reader. + private CommitLogPosition lastProcessedPosition; + private final Timer timer; + private static final String BOOKMARK = "CdcReader.bookmark"; + private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class); + private final ReentrantLock bookmarkLock = new ReentrantLock(); + private final Configuration conf; + + @Inject + CDCBookmark(Configuration conf) + { + this.lastSyncedPosition = null; + this.lastProcessedPosition = null; + this.conf = conf; + this.timer = new Timer(); + } + + /** + * Persists last successfully processed commit log offset to the disk. + */ + public void syncBookmark() + { + CommitLogPosition lastPosition = this.getLastProcessedPosition(); + + if (lastPosition == null) + { + return; + } + logger.debug("Last processed bookmark {}", this.lastProcessedPosition.toString()); + try + { + if (lastPosition.equals(this.lastSyncedPosition)) + { + return; + } + + CommitLogPosition.CommitLogPositionSerializer serializer = + new CommitLogPosition.CommitLogPositionSerializer(); + + // TODO: JSON ser-de and write-rename instead of writing directly to the bookmark + try (FileOutputStream fileOutputStream = new FileOutputStream( + new File(this.getBookmarkPath()))) + { + DataOutputPlus outBuffer = new DataOutputBuffer(); + serializer.serialize(lastPosition, outBuffer); + fileOutputStream.write(((DataOutputBuffer) outBuffer).getData()); + fileOutputStream.flush(); + this.lastSyncedPosition = lastPosition; + logger.info("Successfully synced bookmark {} to the file {}", this.lastSyncedPosition.toString(), + this.getBookmarkPath()); + } + catch (IOException e) + { + logger.error("Error when writing bookmark {} to the file {}", lastPosition.toString(), + this.getBookmarkPath()); + } + } + catch (Exception ex) + { + logger.error("Sync exception {}", ex.getMessage()); + } + } + + /** + * Gets the path to the CDC reader bookmark. + * + * @return complete path to the bookmark file. + */ + public String getBookmarkPath() + { + return String.format("%s/%s", DatabaseDescriptor.getCDCLogLocation(), + BOOKMARK); + } + + @Override + public void run() + { + this.syncBookmark(); + } + + /** + * Gets the last successfully processed commit log offset. + * This method is thread safe. + * + * @return last successfully processed commit log offset. + */ + public CommitLogPosition getLastProcessedPosition() + { + CommitLogPosition lastPosition = null; + try + { + bookmarkLock.lock(); + if (this.lastProcessedPosition != null) + { + lastPosition = new CommitLogPosition(this.lastProcessedPosition.segmentId, + this.lastProcessedPosition.position); + + } + } + finally + { + bookmarkLock.unlock(); + } + return lastPosition; + } + + /** + * Sets the last successfully processed commit log offset. + * This method is thread safe. + * + */ + public void setLastProcessedPosition(CommitLogPosition processedPosition) + { + try + { + bookmarkLock.lock(); + this.lastProcessedPosition = processedPosition; + } + finally + { + bookmarkLock.unlock(); + } + } + + /** + * Starts the background thread to write processed commit log positions to the disk. + * */ + public void startBookmarkSync() + { + timer.schedule(this, 0, DatabaseDescriptor.getCommitLogSyncPeriod()); + } + + /** + * Gets the persisted commit log offset from the bookmark on the disk. + * + * @return persisted commit log offset. + */ + public CommitLogPosition getPersistedBookmark() + { + CommitLogPosition.CommitLogPositionSerializer serializer = + new CommitLogPosition.CommitLogPositionSerializer(); + try (FileInputStream fileInputStream = + new FileInputStream(new File(this.getBookmarkPath()))) + { + DataInputPlus inputBuffer = new DataInputPlus.DataInputStreamPlus(fileInputStream); + return serializer.deserialize(inputBuffer); + } + catch (IOException ex) + { + logger.error("Error when reading the saved bookmark {}", this.getBookmarkPath()); + return null; + } + } + + /** + * Checks whether there's a valid persisted bookmark. + * + * @return whether there's a valid bookmark. + */ + public boolean isValidBookmark() + { + CommitLogPosition bookmark = getPersistedBookmark(); + if (bookmark == null) + { + return false; + } + // It's fine for compression to be null as we are not using this CommitLogDescriptor to write commit logs. + CommitLogDescriptor commitLogDescriptor = new CommitLogDescriptor(bookmark.segmentId, null, + null); + + if (commitLogDescriptor == null || + !Paths.get(DatabaseDescriptor.getCDCLogLocation(), + commitLogDescriptor.cdcIndexFileName()).toFile().exists() || + !Paths.get(DatabaseDescriptor.getCDCLogLocation(), + commitLogDescriptor.fileName()).toFile().exists()) + { + return false; + } + return true; + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java new file mode 100644 index 000000000..733ec9eec --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java @@ -0,0 +1,94 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.time.Instant; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.sun.nio.file.SensitivityWatchEventModifier; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.Pair; +/** + * Watches CDC index and produce commit log offsets to be read and processed. + */ +@Singleton +public class CDCIndexWatcher implements Runnable +{ + private static final Logger logger = LoggerFactory.getLogger(CDCIndexWatcher.class); + private WatchService watcher; + private WatchKey key; + private Path dir; + private final org.apache.cassandra.sidecar.cdc.CommitLogReader commitLogReader; + private ExecutorService cdcWatcherExecutor; + private volatile boolean running; + + @Inject + CDCIndexWatcher(CommitLogReader commitLogReader) + { + this.commitLogReader = commitLogReader; + this.running = true; + } + + @Override + public void run() + { + this.dir = Paths.get(DatabaseDescriptor.getCDCLogLocation()); + this.cdcWatcherExecutor = Executors.newSingleThreadExecutor(); + + this.cdcWatcherExecutor.submit(() -> + { + try + { + this.watcher = FileSystems.getDefault().newWatchService(); + this.key = Paths.get(dir.toUri()).register(this.watcher, + new WatchEvent.Kind[]{ENTRY_MODIFY}, + SensitivityWatchEventModifier.HIGH); + while (running) + { + WatchKey aKey = watcher.take(); + if (!key.equals(aKey)) + { + logger.error("WatchKey not recognized."); + continue; + } + for (WatchEvent event : key.pollEvents()) + { + WatchEvent.Kind kind = event.kind(); + WatchEvent ev = (WatchEvent) event; + Path relativePath = ev.context(); + Path absolutePath = dir.resolve(relativePath); + logger.debug("Event type : {}, Path : {}", event.kind().name(), absolutePath); + logger.debug("Event timestamp in milliseconds : {}", Instant.now().toEpochMilli()); + if (!CommitLogReader.isValidIndexFile(absolutePath.getFileName().toString())) + { + continue; + } + this.commitLogReader.submitReadRequest(Pair.create(absolutePath, 0)); + } + key.reset(); + } + } + catch (Throwable throwable) + { + logger.error("Error when watching the CDC dir : {}", throwable.getMessage()); + } + }); + this.commitLogReader.start(); + } + + public void stop() + { + running = false; + this.commitLogReader.stop(); + this.cdcWatcherExecutor.shutdown(); + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java new file mode 100644 index 000000000..d8a9cd468 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java @@ -0,0 +1,90 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Timer; +import java.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor; + +/** + * Monitors the cdc_raw directory, cleanup unused commit logs and report metrics + * */ +@Singleton +public class CDCRawDirectoryMonitor extends TimerTask +{ + + private final Timer timer; + private final CDCReaderMonitor monitor; + private volatile boolean running; + private static final Logger logger = LoggerFactory.getLogger(CDCRawDirectoryMonitor.class); + + @Inject + CDCRawDirectoryMonitor(CDCReaderMonitor monitor) + { + this.timer = new Timer(); + this.monitor = monitor; + this.running = false; + } + + /** + * Starts the background thread to monitor the cdc_raw dir. + * */ + public void startMonitoring() + { + this.running = true; + timer.schedule(this, 0, DatabaseDescriptor.getCDCDiskCheckInterval()); + } + + @Override + public void run() + { + if (!this.running) + { + return; + } + // TODO : Don't be someone who just complains, do some useful work, clean files older than + // the last persisted bookmark. + this.monitor.reportCdcRawDirectorySizeInBytes(getCdcRawDirectorySize()); + } + + public synchronized void stop() + { + if (!this.running) + { + return; + } + this.running = false; + this.timer.cancel(); + } + + + private long getCdcRawDirectorySize() + { + long dirSize = 0; + try (DirectoryStream stream = + Files.newDirectoryStream(Paths.get(DatabaseDescriptor.getCDCLogLocation()))) + { + for (Path path : stream) + { + if (!Files.isDirectory(path)) + { + dirSize += Files.size(path); + } + } + } + catch (IOException ex) + { + logger.error("Error when calculating size of the cdc_raw dir {} : {}", + DatabaseDescriptor.getCDCLogLocation(), ex); + } + return dirSize; + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java new file mode 100644 index 000000000..065f29164 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java @@ -0,0 +1,157 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.io.InvalidObjectException; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.sidecar.CQLSession; + +/** + * Cassandra's real-time change data capture service. + */ +@Singleton +public class CDCReaderService implements Host.StateListener +{ + private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class); + private final CDCIndexWatcher cdcIndexWatcher; + private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor; + private final CQLSession session; + private final CassandraConfig cassandraConfig; + + @Inject + public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session, + CassandraConfig cassandraConfig) + { + this.cdcRawDirectoryMonitor = monitor; + this.cdcIndexWatcher = cdcIndexWatcher; + this.session = session; + this.cassandraConfig = cassandraConfig; + } + + public synchronized void start() + { + try + { + // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the + // config is valid otherwise. + waitForCassandraServer(); + Cluster cluster = session.getLocalCql().getCluster(); + if (cluster == null) + { + throw new InvalidObjectException("Cannot connect to the local Cassandra node"); + } + + // Ensure Cassandra config is valid and remove mutable data paths from the config + // to ensure CDC reader doesn't accidentally step on Cassandra data. + this.cassandraConfig.init(); + // TODO : Load metadata from the CQLSession. + Schema.instance.loadFromDisk(false); + this.cassandraConfig.muteConfigs(); + + for (String keySpace : Schema.instance.getKeyspaces()) + { + logger.info("Keyspace : {}", keySpace); + KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace); + if (keyspaceMetadata == null) + { + continue; + } + for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews()) + { + logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name, + tableMetadata.params.cdc ? "true" : "false"); + } + } + // Start monitoring the cdc_raw directory + this.cdcRawDirectoryMonitor.startMonitoring(); + // Start reading the current commit log. + this.cdcIndexWatcher.run(); + + } + catch (Exception ex) + { + logger.error("Error starting the CDC reader {}", ex); + this.stop(); + return; + } + logger.info("Successfully started the CDC reader"); + + } + + public synchronized void stop() + { + logger.info("Stopping CDC reader..."); + this.cdcRawDirectoryMonitor.stop(); + this.cdcIndexWatcher.stop(); + logger.info("Successfully stopped the CDC reader"); + } + @Override + public void onAdd(Host host) + { + + } + + @Override + public void onUp(Host host) + { + + } + + @Override + public void onDown(Host host) + { + + } + + @Override + public void onRemove(Host host) + { + + } + + @Override + public void onRegister(Cluster cluster) + { + + } + + @Override + public void onUnregister(Cluster cluster) + { + + } + + /** + * Waiting for the Cassandra server. + * */ + private void waitForCassandraServer() throws InterruptedException + { + long retryIntervalMs = 1; + Cluster cluster = null; + + while (cluster == null) + { + if (this.session.getLocalCql() != null) + { + cluster = session.getLocalCql().getCluster(); + } + if (cluster != null) + { + break; + } + else + { + logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds", + retryIntervalMs); + Thread.sleep(retryIntervalMs); + retryIntervalMs *= 2; + } + } + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java new file mode 100644 index 000000000..1c4b8d9bc --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java @@ -0,0 +1,68 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.io.File; +import java.nio.file.Paths; +import java.util.function.Supplier; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.YamlConfigurationLoader; +import org.apache.cassandra.sidecar.Configuration; + +/** + * Custom Cassandra configurator + */ +@Singleton +public class CassandraConfig implements Supplier +{ + private Config config; + + @Inject + public CassandraConfig(Configuration config) + { + System.setProperty("cassandra.config", config.getCassandraConfigPath()); + this.config = null; + } + + public synchronized void init() throws IllegalArgumentException + { + this.config = new YamlConfigurationLoader().loadConfig(); + // TODO DatabaseDescriptor.initBasicConfigs(); ? + DatabaseDescriptor.toolInitialization(); + if (!DatabaseDescriptor.isCDCEnabled()) + { + throw new IllegalArgumentException("CDC is not enabled in Cassandra, CDC reader will not start"); + } + + if (DatabaseDescriptor.getCDCLogLocation() == null || DatabaseDescriptor.getCDCLogLocation().equals("")) + { + throw new IllegalArgumentException("cdc_raw_directory location is not set, cannot start the CDC reader"); + } + + File cdcPath = Paths.get(DatabaseDescriptor.getCDCLogLocation()).toFile(); + + if (!cdcPath.exists()) + { + throw new IllegalArgumentException(String.format("Configured Cassandra cdc_raw_directory [%s] doesn't " + + "exist", cdcPath)); + } + } + + public synchronized void muteConfigs() + { + assert (this.config != null); + this.config.hints_directory = null; + this.config.data_file_directories = new String[0]; + this.config.saved_caches_directory = null; + Config.setOverrideLoadConfig(this); + Config.setClientMode(true); + DatabaseDescriptor.toolInitialization(); + } + + @Override + public Config get() + { + return this.config; + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java b/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java new file mode 100644 index 000000000..74096e5c8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java @@ -0,0 +1,118 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.nio.ByteBuffer; +import org.apache.cassandra.db.partitions.PartitionUpdate; +/* + _______________________________________________________________________________ + | Envelope | Payload | Payload | | | + | Version | Type | Version | Flags | Serialized Payload bytes| + | (1 byte) | (1 byte) | (1 byte) | (1 byte) | (variable length) | + |_____________|___________|____________|___________|___________________________| + +Envelope Version +================ +Version of the envelope, changes with structural changes to the envelope. + +Payload Type +============ +The type of the payload. Defined in the PayloadType enum. Example +types are PartitionUpdate, Mutation, and URI. + +Payload Version +=============== +Some payloads (e.g. PartitionUpdate) need a version for de-serializing. + +Flags +===== +Any custom flags that depends on the use case. E.g. someone can use these to differentiate +between snapshot and change events. + +Serialized Payload +===================== +Serialized payload. This is just a byte array. + +*/ + +/** + * Defines the envelop of a CDC event + * + * */ +public class Change +{ + public static final byte CHANGE_EVENT = 0; + public static final byte REFRESH_EVENT = 1; + // Envelop version, update with envelop changes. + public static final byte ENVELOPE_VERSION = 1; + // Side of the header, envelopeVersion + payloadType + payloadVersion + flags + public static final int HEADER_SIZE = 4; + + private byte envelopeVersion; + + private byte payloadType; + + private byte payloadVersion; + + private byte flags; + + private byte[] payload; + + private String partitionKey; + + + public Change(PayloadType payloadType, int version, byte flags, PartitionUpdate partitionUpdate) + { + this.envelopeVersion = Change.ENVELOPE_VERSION; + this.payloadType = payloadType.getValue(); + this.payloadVersion = (byte) version; + assert ((int) this.payloadVersion) == version; + this.flags = flags; + this.payload = PartitionUpdate.toBytes(partitionUpdate, this.payloadVersion).array(); + this.partitionKey = partitionUpdate.metadata().partitionKeyType.getString(partitionUpdate.partitionKey() + .getKey()); + } + + public Change(byte[] serializedChange) + { + ByteBuffer buff = ByteBuffer.wrap(serializedChange); + this.envelopeVersion = buff.get(0); + assert this.envelopeVersion == Change.ENVELOPE_VERSION; + this.payloadType = buff.get(1); + this.payloadVersion = buff.get(2); + this.flags = buff.get(3); + this.payload = new byte[serializedChange.length - Change.HEADER_SIZE]; + buff.position(Change.HEADER_SIZE); + buff.get(this.payload, 0, this.payload.length); + } + + public byte[] toBytes() throws Exception + { + // We don't need to serialize the partition key + ByteBuffer dob = ByteBuffer.allocate(Change.HEADER_SIZE + this.payload.length); + dob.put(this.envelopeVersion); + dob.put(this.payloadType); + dob.put(this.payloadVersion); + dob.put(this.flags); + dob.put(this.payload); + return dob.array(); + } + + public PartitionUpdate getPartitionUpdateObject() throws Exception + { + if (this.payload == null || this.payloadType != PayloadType.PARTITION_UPDATE.getValue()) + { + throw new Exception(String.format("Invalid payloadType (%d), expected (%d)", this.payloadType, + PayloadType.PARTITION_UPDATE.getValue())); + } + return PartitionUpdate.fromBytes(ByteBuffer.wrap(this.payload), (int) this.payloadVersion); + } + + public String getPartitionKey() + { + return this.partitionKey; + } + + public int getPayloadVersion() + { + return (int) this.payloadVersion; + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java new file mode 100644 index 000000000..a0a6970b8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java @@ -0,0 +1,321 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.utils.Pair; + +/** + * Reads a Cassandra commit log. Read offsets are provided by the CdcIndexWatcher + */ +public class CommitLogReader +{ + + private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class); + private final ExecutorService executor; + private LinkedBlockingDeque> commitLogReadRequests; + private int prevOffset = 0; + private final org.apache.cassandra.db.commitlog.CommitLogReader commitLogReader; + private Path prevCommitLogFilePath = null; + private Path prevIndexFilePath = null; + private final MutationHandler mutationHandler; + private CDCBookmark bookmark; + private volatile boolean running; + + // These are copied from CommitLogDescriptor class. Wonders of the current Cass CDC design. (sigh) + private static final String SEPARATOR = "-"; + private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR; + private static final String FILENAME_EXTENSION = ".log"; + + private static final String IDX_FILENAME_SUFFIX = "_cdc.idx"; + private static final Pattern COMMIT_LOG_IDX_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + + "((\\d+)(" + SEPARATOR + "\\d+)?)" + IDX_FILENAME_SUFFIX); + + @Inject + CommitLogReader(MutationHandler mutationHandler, CDCBookmark cdcBookmark) + { + this.executor = Executors.newSingleThreadExecutor(); //TODO: Single reader, can be multiple readers, but watch + // for the commit log deletion process, it has to be changed. + //TODO : currently this is unbounded, do we want to bound it and add alerts? + this.commitLogReadRequests = new LinkedBlockingDeque<>(); + this.commitLogReader = new org.apache.cassandra.db.commitlog.CommitLogReader(); + this.mutationHandler = mutationHandler; + this.bookmark = cdcBookmark; + this.running = true; + } + + public void start() + { + // Before sending out the live change stream, process changes since the last bookmark if there are valid + // ones. + this.processOldCommitLogs(); + + executor.submit(() -> + { + try + { + this.bookmark.startBookmarkSync(); + // Process the live change stream. + while (this.running) + { + logger.debug("Waiting for a new event"); + this.processCDCWatchEvent(this.commitLogReadRequests.take()); + } + } + catch (Throwable e) + { + logger.error("Error handling the CDC watch event : {} ", e.getMessage()); + } + logger.info("Exit processing watch events"); + return; + }); + } + + public void submitReadRequest(Pair idxPath) + { + this.commitLogReadRequests.add(idxPath); + } + + /** + * Checks whether the given file name is a valid index file. Validate file name against the expected format. + */ + public static boolean isValidIndexFile(String indexFileName) + { + return ((COMMIT_LOG_IDX_FILE_PATTERN.matcher(indexFileName)).matches()); + } + + /** + * Construct a CommitLogDescriptor from the index file name. + */ + public static CommitLogDescriptor fromIndexFile(String indexFileName) + { + if (!isValidIndexFile(indexFileName)) + { + logger.warn("Provided file name [{}] is not an CDC index file", indexFileName); + return null; + } + return CommitLogDescriptor.fromFileName(indexFileName.replace(IDX_FILENAME_SUFFIX, FILENAME_EXTENSION)); + } + + /** + * Gets a list of commit logs since the persisted bookmark. The ordering defines the sequence + * they should be processed by the CDC reader. + * + * @return ordered list of commit logs since the persisted bookmark. + */ + public static SortedSet getSortedCommitLogPositionsSinceCommitLog( + CommitLogPosition bookmarkedPosition) + { + SortedSet commitLogPositions = new TreeSet<>(); + // If there's no bookmark, this could be the initial run or the bookmark is lost. In either case, we + // return an empty list of commit logs so caller can take necessary actions. + if (bookmarkedPosition == null) + { + return commitLogPositions; + } + + SortedSet allCommitLogs = new TreeSet<>(); + try (DirectoryStream stream = + Files.newDirectoryStream(Paths.get(DatabaseDescriptor.getCDCLogLocation()))) + { + for (Path path : stream) + { + if (!Files.isDirectory(path)) + { + Path fileNamePath = path.getFileName(); + if (fileNamePath == null) + { + continue; + } + String fileName = fileNamePath.toString(); + if (isValidIndexFile(fileName)) + { + allCommitLogs.add(new CommitLogPosition(fromIndexFile(fileName).id, 0)); + } + } + } + } + catch (IOException ex) + { + logger.error("Error accessing the CDC dir : {}", ex.getMessage()); + return commitLogPositions; + } + + if (allCommitLogs.size() == 0) + { + return commitLogPositions; + } + + // Use this flag to find the point of bookmark in the list of Idx files + // that are sorted by the commit log segment ids. + boolean foundBookmark = false; + + for (CommitLogPosition commitLog : allCommitLogs) + { + if (commitLog.segmentId == bookmarkedPosition.segmentId) + { + foundBookmark = true; + } + if (foundBookmark) + { + commitLogPositions.add(new CommitLogPosition(commitLog.segmentId, + commitLog.segmentId == bookmarkedPosition.segmentId ? bookmarkedPosition.position : 0)); + } + } + + return commitLogPositions; + } + + private void processOldCommitLogs() + { + try + { + if (this.bookmark.isValidBookmark()) + { + SortedSet validCommitLogs = new TreeSet<>(Collections.reverseOrder()); + validCommitLogs.addAll(CommitLogReader.getSortedCommitLogPositionsSinceCommitLog( + this.bookmark.getPersistedBookmark())); + for (CommitLogPosition commitLogPosition : validCommitLogs) + { + + this.commitLogReadRequests.addFirst(Pair.create(Paths.get(DatabaseDescriptor.getCDCLogLocation(), + new CommitLogDescriptor(commitLogPosition.segmentId, null, null) + .cdcIndexFileName()), + commitLogPosition.position)); + } + } + } + catch (Exception ex) + { + logger.error("Error when processing commit logs since the last bookmark: {}. Start with a fresh " + + "data dump from required tables", ex.getMessage()); + } + } + + private void processCDCWatchEvent(Pair idxPath) + { + Path indexFilePath = idxPath.left; + Integer savedOffset = idxPath.right; + + if (indexFilePath == null) + { + logger.error("Index file path is empty."); + return; + } + + if (savedOffset > 0) + { + prevOffset = savedOffset; + } + + logger.debug("Processing a commit log segment"); + + Path indexFileName = indexFilePath.getFileName(); + if (indexFileName == null || !CommitLogReader.isValidIndexFile(indexFileName.toString())) + { + logger.error("File is not an index file : {} ", indexFilePath.toString()); + return; + } + String commitLogFile = indexFileName.toString().replace(IDX_FILENAME_SUFFIX, FILENAME_EXTENSION); + Path parentDir = indexFilePath.getParent(); + if (parentDir == null) + { + logger.error("Unable to parse commit log file path from {} ", indexFilePath.toString()); + return; + } + Path commitLogPath = Paths.get(parentDir.toString(), commitLogFile); + + try + { + int offset; + long segmentId; + logger.debug("Opening the idx file {}", indexFilePath.toString()); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(indexFilePath.toFile()), Charset.defaultCharset()), + 128)) + { + logger.debug("Reader {}", reader); + offset = Integer.parseInt(reader.readLine()); + logger.debug("Offset is {}", offset); + segmentId = CommitLogDescriptor.fromFileName(commitLogFile).id; + logger.debug("Segment id is {}", segmentId); + + // BEGIN : TODO + // We are switching to a newer commit log and deleting the last one. We need to ensure the validity of + // the bookmark so it remains valid if the CDC reader crashes before observing changes from the new + // commit log. Failing to do so will trigger a fresh snapshot upon restarting the CDC reader. + // END : TODO + // TODO : Do this asynchronously, hand over to a background cleaner thread. + if (this.prevCommitLogFilePath != null && + !this.prevCommitLogFilePath.toString().equals(commitLogPath.toString())) + { + boolean suc = this.prevCommitLogFilePath.toFile().delete(); + logger.info("{} the old file {}", suc ? "Deleted" : "Could not delete", + this.prevCommitLogFilePath.toString()); + prevOffset = 0; + } + if (this.prevIndexFilePath != null && + !this.prevIndexFilePath.toString().equals(indexFilePath.toString())) + { + boolean suc = this.prevIndexFilePath.toFile().delete(); + logger.info("{} the old CDC idx file {}", suc ? "Deleted" : "Could not delete", + this.prevIndexFilePath); + } + } + catch (NumberFormatException ex) + { + logger.error("Error when reading offset/segment id from the idx file {} : {}", indexFilePath.toString(), + ex.getMessage()); + throw ex; + } + catch (Exception ex) + { + logger.error("Error when deleting the old file {} : {}", this.prevCommitLogFilePath.toString(), + ex.getMessage()); + throw ex; + } + + logger.info("Reading from the commit log file {} at offset {}", commitLogPath.toString(), prevOffset); + this.prevCommitLogFilePath = commitLogPath; + this.prevIndexFilePath = indexFilePath; + + + CommitLogPosition clp = new CommitLogPosition(segmentId, prevOffset); + commitLogReader.readCommitLogSegment(this.mutationHandler, commitLogPath.toFile(), clp, -1, + false); + prevOffset = offset; + } + catch (Throwable ex) + { + logger.error("Error when processing a commit log segment : {}", ex.getMessage()); + } + logger.debug("Commit log segment processed."); + + } + + public void stop() + { + this.running = false; + this.executor.shutdown(); + this.mutationHandler.stop(); + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java b/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java new file mode 100644 index 000000000..ddbf24c92 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java @@ -0,0 +1,121 @@ +package org.apache.cassandra.sidecar.cdc; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.sidecar.Configuration; +import org.apache.cassandra.sidecar.cdc.output.Output; +import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor; +/** + * Implements Cassandra CommitLogReadHandler, dandles mutations read from Cassandra commit logs. + */ +public class MutationHandler implements CommitLogReadHandler +{ + private static final Logger logger = LoggerFactory.getLogger(MutationHandler.class); + private Output output; + Future mutationFuture = null; + private ExecutorService executor; + private CDCReaderMonitor monitor; + private CDCBookmark bookmark; + + @Inject + public MutationHandler(Configuration conf, CDCReaderMonitor monitor, CDCBookmark cdcBookmark, Output output) + { + this.output = output; + this.executor = Executors.newSingleThreadExecutor(); + this.monitor = monitor; + this.bookmark = cdcBookmark; + } + + @Override + public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException + { + return false; + } + + @Override + public void handleUnrecoverableError(CommitLogReadException exception) throws IOException + { + } + + @Override + public void handleMutation(Mutation mutation, int size, int entryLocation, CommitLogDescriptor desc) + { + if (mutation == null || !mutation.trackedByCDC()) + { + return; + } + + if (output == null) + { + logger.error("Output is not initialized"); + } + + logger.debug("Started handling a mutation of the keyspace : {} at offset {}", mutation.getKeyspaceName(), + entryLocation); + + // Pipeline Mutation reading and de-serializing with sending to the output. + //TODO: Multiple threads can process Mutations in parallel; hence use a thread pool. Be careful to design + // bookmarks and commit log deletion to work with multiple threads. Also benchmark the CPU usage of a pool. + if (mutationFuture != null) + { + try + { + CommitLogPosition completedPosition = mutationFuture.get(); + logger.debug("Completed sending data at offset {} : {}", completedPosition.segmentId, + completedPosition.position); + this.bookmark.setLastProcessedPosition(completedPosition); + this.monitor.incSentSuccess(); + } + //TODO: Re-try logic at the mutation level, with exponential backoff and alerting + catch (Exception e) + { + logger.error("Error sending data at offset {} : {}", e.getMessage()); + this.monitor.incSentFailure(); + } + } + mutationFuture = executor.submit(() -> + { + CommitLogPosition commitLogPosition = new CommitLogPosition(desc.id, entryLocation); + for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) + { + // TODO: bounded number of retries at the partition level. + try + { + output.emitChange(new Change(PayloadType.PARTITION_UPDATE, MessagingService.current_version, + Change.CHANGE_EVENT, partitionUpdate)); + } + catch (Exception ex) + { + logger.error("Error when sending data at the offset : {}", ex.getMessage()); + throw ex; + } + } + logger.debug("Done sending data at offset {}", commitLogPosition.position); + return commitLogPosition; + }); + } + + public void stop() + { + try + { + this.output.close(); + } + catch (IOException ioex) + { + logger.error("Error when closing the Output : {}", ioex.getMessage()); + } + this.executor.shutdown(); + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java b/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java new file mode 100644 index 000000000..120dd457f --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java @@ -0,0 +1,23 @@ +package org.apache.cassandra.sidecar.cdc; + +/** + * Enum representing the payload payloadType of a Change + */ +public enum PayloadType +{ + PARTITION_UPDATE((byte) 0), + MUTATION((byte) 1), + URI((byte) 2); + + private byte value; + + PayloadType(byte value) + { + this.value = value; + } + + public byte getValue() + { + return this.value; + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java b/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java new file mode 100644 index 000000000..1a87489bc --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java @@ -0,0 +1,52 @@ +package org.apache.cassandra.sidecar.cdc.output; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.sidecar.cdc.Change; +/** + * Null output for Cassandra PartitionUpdates. + */ +public class ConsoleOutput implements Output +{ + + private static final Logger logger = LoggerFactory.getLogger(ConsoleOutput.class); + + @Inject + ConsoleOutput() + { + } + + @Override + public void emitChange(Change change) throws Exception + { + if (change == null || change.getPartitionUpdateObject() == null) + { + return; + } + PartitionUpdate partition = change.getPartitionUpdateObject(); + logger.info("Handling a partition with the column family : {}", partition.metadata().name); + String pkStr = partition.metadata().partitionKeyType.getString(partition.partitionKey() + .getKey()); + logger.info("> Partition Key : {}", pkStr); + + if (partition.staticRow().columns().size() > 0) + { + logger.info("> -- Static columns : {} ", partition.staticRow().toString(partition.metadata(), false)); + } + UnfilteredRowIterator ri = partition.unfilteredIterator(); + while (ri.hasNext()) + { + Unfiltered r = ri.next(); + logger.info("> -- Row contents: {}", r.toString(partition.metadata(), false)); + } + } + + @Override + public void close() + { + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java b/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java new file mode 100644 index 000000000..0259feac9 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java @@ -0,0 +1,12 @@ +package org.apache.cassandra.sidecar.cdc.output; + +import java.io.Closeable; +import org.apache.cassandra.sidecar.cdc.Change; + +/** + * Interface for emitting Cassandra PartitionUpdates + */ +public interface Output extends Closeable +{ + void emitChange(Change change) throws Exception; +} diff --git a/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java new file mode 100644 index 000000000..114daeb35 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java @@ -0,0 +1,11 @@ +package org.apache.cassandra.sidecar.metrics.cdc; + +/** + * Interface to collecting metrics from the CDC reader. + */ +public interface CDCReaderMonitor +{ + void incSentSuccess(); + void incSentFailure(); + void reportCdcRawDirectorySizeInBytes(long dirSize); +} diff --git a/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java new file mode 100644 index 000000000..706a18151 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java @@ -0,0 +1,39 @@ +package org.apache.cassandra.sidecar.metrics.cdc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +/** + * Implements monitor interface, uses Netflix's spectator libraries to report metrics. + */ +@Singleton +public class CDCReaderMonitorLogger implements CDCReaderMonitor +{ + private static final Logger logger = LoggerFactory.getLogger(CDCReaderMonitorLogger.class); + + @Inject + public CDCReaderMonitorLogger() + { + } + + @Override + public void incSentSuccess() + { + logger.info("Successfully sent a commit log entry"); + } + + @Override + public void incSentFailure() + { + logger.info("Failed to send a commit log entry"); + } + + @Override + public void reportCdcRawDirectorySizeInBytes(long dirSize) + { + logger.info("Size of the cdc_raw dir is {}", dirSize); + } +} From 0642027c42a5a92cbd52fc3661c6e85d98ca3dc8 Mon Sep 17 00:00:00 2001 From: Tharanga Gamaethige Date: Wed, 4 Nov 2020 16:12:54 -0800 Subject: [PATCH 2/4] WIP version of the Cass schema change listener that can refresh the metadata as it changes in the server. --- .../sidecar/cdc/CDCReaderService.java | 8 +- .../sidecar/cdc/CDCSchemaChangeListener.java | 186 ++++++++++++++++++ 2 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java index 065f29164..7cc25c61f 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java @@ -23,15 +23,17 @@ public class CDCReaderService implements Host.StateListener private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor; private final CQLSession session; private final CassandraConfig cassandraConfig; + private final CDCSchemaChangeListener cdcSchemaChangeListener; @Inject public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session, - CassandraConfig cassandraConfig) + CassandraConfig cassandraConfig, CDCSchemaChangeListener cdcSchemaChangeListener) { this.cdcRawDirectoryMonitor = monitor; this.cdcIndexWatcher = cdcIndexWatcher; this.session = session; this.cassandraConfig = cassandraConfig; + this.cdcSchemaChangeListener = cdcSchemaChangeListener; } public synchronized void start() @@ -53,7 +55,9 @@ public synchronized void start() // TODO : Load metadata from the CQLSession. Schema.instance.loadFromDisk(false); this.cassandraConfig.muteConfigs(); - + // Register a schema change listener. In the future, this allows us to update metadata upon + // schema changes without restarting the side car + cluster.register(this.cdcSchemaChangeListener); for (String keySpace : Schema.instance.getKeyspaces()) { logger.info("Keyspace : {}", keySpace); diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java new file mode 100644 index 000000000..642c55cf6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java @@ -0,0 +1,186 @@ +package org.apache.cassandra.sidecar.cdc; + +import javax.inject.Singleton; + +import com.datastax.driver.core.AggregateMetadata; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.FunctionMetadata; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.MaterializedViewMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.UserType; +import com.google.inject.Inject; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +//import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +//import org.apache.cassandra.schema.Schema; +//import org.apache.cassandra.schema.TableId; +//import org.apache.cassandra.schema.Tables; + +/** + * Schema change listener to update CDC reader's keyspace/column family metadata. + */ +@Singleton +public class CDCSchemaChangeListener implements SchemaChangeListener +{ + private static final Logger logger = LoggerFactory.getLogger(SchemaChangeListener.class); + + @Inject + public CDCSchemaChangeListener() + { + } + + // TODO : Add/modify/remove KeySpaces + @Override + public void onKeyspaceAdded(KeyspaceMetadata keyspace) + { + + } + + @Override + public void onKeyspaceRemoved(KeyspaceMetadata keyspace) + { + + } + + @Override + public void onKeyspaceChanged(KeyspaceMetadata current, KeyspaceMetadata previous) + { + + } + + // Tables + @Override + public void onTableAdded(TableMetadata table) + { +// try +// { +// org.apache.cassandra.schema.KeyspaceMetadata keyspaceMetadata = Schema.instance +// .getKeyspaceMetadata(table.getKeyspace().getName()); +// org.apache.cassandra.schema.TableMetadata newTable = CreateTableStatement.parse(table.asCQLQuery(), +// table.getKeyspace().getName()) +// .build(TableId.fromUUID(table.getId())); +// Tables tables = keyspaceMetadata.tables +// .with(newTable); +// org.apache.cassandra.schema.KeyspaceMetadata updatedKeyspaceMetadata = keyspaceMetadata +// .withSwapped(tables); +// Schema.instance.load(updatedKeyspaceMetadata); +// } +// catch (Exception ex) +// { +// logger.error("Failed to update metadata on onTableAdded : {}", ex.getMessage()); +// } + } + + @Override + public void onTableRemoved(TableMetadata table) + { +// try +// { +// org.apache.cassandra.schema.KeyspaceMetadata keyspaceMetadata = Schema.instance +// .getKeyspaceMetadata(table.getKeyspace().getName()); +// Tables tables = keyspaceMetadata.tables +// .without(table.getName()); +// org.apache.cassandra.schema.KeyspaceMetadata updatedKeyspaceMetadata = keyspaceMetadata +// .withSwapped(tables); +// Schema.instance.load(updatedKeyspaceMetadata); +// } +// catch (Exception ex) +// { +// logger.error("Failed to update metadata on onTableRemoved : {}", ex.getMessage()); +// } + } + + @Override + public void onTableChanged(TableMetadata current, TableMetadata previous) + { + onTableRemoved(previous); + onTableAdded(current); + } + + // Types + @Override + public void onUserTypeAdded(UserType type) + { + + } + + @Override + public void onUserTypeRemoved(UserType type) + { + + } + + @Override + public void onUserTypeChanged(UserType current, UserType previous) + { + + } + + @Override + public void onFunctionAdded(FunctionMetadata function) + { + + } + + @Override + public void onFunctionRemoved(FunctionMetadata function) + { + + } + + @Override + public void onFunctionChanged(FunctionMetadata current, FunctionMetadata previous) + { + + } + + @Override + public void onAggregateAdded(AggregateMetadata aggregate) + { + + } + + @Override + public void onAggregateRemoved(AggregateMetadata aggregate) + { + + } + + @Override + public void onAggregateChanged(AggregateMetadata current, AggregateMetadata previous) + { + + } + + @Override + public void onMaterializedViewAdded(MaterializedViewMetadata view) + { + + } + + @Override + public void onMaterializedViewRemoved(MaterializedViewMetadata view) + { + + } + + @Override + public void onMaterializedViewChanged(MaterializedViewMetadata current, MaterializedViewMetadata previous) + { + + } + + @Override + public void onRegister(Cluster cluster) + { + + } + + @Override + public void onUnregister(Cluster cluster) + { + + } +} From b6eece2c7593d34f85bd53b99869d89c067f8cd3 Mon Sep 17 00:00:00 2001 From: Tharanga Gamaethige Date: Wed, 4 Nov 2020 16:20:52 -0800 Subject: [PATCH 3/4] Adding Apache licensing headers. --- .../cassandra/sidecar/cdc/CDCBookmark.java | 18 ++++++++++++++++++ .../cassandra/sidecar/cdc/CDCIndexWatcher.java | 18 ++++++++++++++++++ .../sidecar/cdc/CDCRawDirectoryMonitor.java | 18 ++++++++++++++++++ .../sidecar/cdc/CDCReaderService.java | 18 ++++++++++++++++++ .../sidecar/cdc/CDCSchemaChangeListener.java | 18 ++++++++++++++++++ .../cassandra/sidecar/cdc/CassandraConfig.java | 18 ++++++++++++++++++ .../apache/cassandra/sidecar/cdc/Change.java | 18 ++++++++++++++++++ .../cassandra/sidecar/cdc/CommitLogReader.java | 18 ++++++++++++++++++ .../cassandra/sidecar/cdc/MutationHandler.java | 18 ++++++++++++++++++ .../cassandra/sidecar/cdc/PayloadType.java | 18 ++++++++++++++++++ .../sidecar/cdc/output/ConsoleOutput.java | 18 ++++++++++++++++++ .../cassandra/sidecar/cdc/output/Output.java | 18 ++++++++++++++++++ .../sidecar/metrics/cdc/CDCReaderMonitor.java | 18 ++++++++++++++++++ .../metrics/cdc/CDCReaderMonitorLogger.java | 18 ++++++++++++++++++ 14 files changed, 252 insertions(+) diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java index bd4914773..fa41435a4 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java index 733ec9eec..5fec9c8da 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCIndexWatcher.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.nio.file.FileSystems; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java index d8a9cd468..8bf43103c 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.io.IOException; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java index 7cc25c61f..36dd1b7fb 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.io.InvalidObjectException; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java index 642c55cf6..b43e50156 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCSchemaChangeListener.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import javax.inject.Singleton; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java index 1c4b8d9bc..336b7008e 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.io.File; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java b/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java index 74096e5c8..135dcf2f7 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/Change.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.nio.ByteBuffer; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java index a0a6970b8..5588b612b 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CommitLogReader.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.io.BufferedReader; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java b/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java index ddbf24c92..e1c162803 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; import java.io.IOException; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java b/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java index 120dd457f..ae1e60f30 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/PayloadType.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc; /** diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java b/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java index 1a87489bc..facd32017 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/output/ConsoleOutput.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc.output; import org.slf4j.Logger; diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java b/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java index 0259feac9..250562776 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.cdc.output; import java.io.Closeable; diff --git a/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java index 114daeb35..aa77d0060 100644 --- a/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java +++ b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitor.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.metrics.cdc; /** diff --git a/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java index 706a18151..9bd631867 100644 --- a/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java +++ b/src/main/java/org/apache/cassandra/sidecar/metrics/cdc/CDCReaderMonitorLogger.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.sidecar.metrics.cdc; import org.slf4j.Logger; From a2ebd3dce09c1c962b4c3c5a08f20e3e5cd5f36c Mon Sep 17 00:00:00 2001 From: Tharanga Gamaethige Date: Wed, 4 Nov 2020 16:34:52 -0800 Subject: [PATCH 4/4] Using the common CQLSession class. --- .../apache/cassandra/sidecar/CQLSession.java | 115 ------------------ .../sidecar/cdc/CDCReaderService.java | 2 +- 2 files changed, 1 insertion(+), 116 deletions(-) delete mode 100644 src/main/java/org/apache/cassandra/sidecar/CQLSession.java diff --git a/src/main/java/org/apache/cassandra/sidecar/CQLSession.java b/src/main/java/org/apache/cassandra/sidecar/CQLSession.java deleted file mode 100644 index d199a94f2..000000000 --- a/src/main/java/org/apache/cassandra/sidecar/CQLSession.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.apache.cassandra.sidecar; - -import java.net.InetSocketAddress; -import java.util.Collections; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.NettyOptions; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; -import com.datastax.driver.core.policies.ReconnectionPolicy; -import com.datastax.driver.core.policies.RoundRobinPolicy; -import com.datastax.driver.core.policies.WhiteListPolicy; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -/** - * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as - * defined in the Configuration. - */ -@Singleton -public class CQLSession -{ - private static final Logger logger = LoggerFactory.getLogger(CQLSession.class); - @Nullable - private Session localSession; - private final InetSocketAddress inet; - private final WhiteListPolicy wlp; - private NettyOptions nettyOptions; - private QueryOptions queryOptions; - private ReconnectionPolicy reconnectionPolicy; - - @Inject - public CQLSession(Configuration configuration) - { - inet = InetSocketAddress.createUnresolved(configuration.getCassandraHost(), configuration.getCassandraPort()); - wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet)); - this.nettyOptions = new NettyOptions(); - this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE); - this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000, - configuration.getHealthCheckFrequencyMillis()); - } - - @VisibleForTesting - CQLSession(InetSocketAddress target, NettyOptions options) - { - inet = target; - wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet)); - this.nettyOptions = options; - this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE); - reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000); - } - - /** - * Provides a Session connected only to the local node from configuration. If null it means the the connection was - * not able to be established. The session still might throw a NoHostAvailableException if the local host goes - * offline or otherwise unavailable. - * - * @return Session - */ - @Nullable - public synchronized Session getLocalCql() - { - Cluster cluster = null; - try - { - if (localSession == null) - { - cluster = Cluster.builder() - .addContactPointsWithPorts(inet) - .withLoadBalancingPolicy(wlp) - .withQueryOptions(queryOptions) - .withReconnectionPolicy(reconnectionPolicy) - .withoutMetrics() - // tests can create a lot of these Cluster objects, to avoid creating HWTs and - // event thread pools for each we have the override - .withNettyOptions(nettyOptions) - .build(); - localSession = cluster.connect(); - } - } - catch (Exception e) - { - logger.debug("Failed to reach Cassandra", e); - if (cluster != null) - { - try - { - cluster.close(); - } - catch (Exception ex) - { - logger.debug("Failed to close cluster in cleanup", ex); - } - } - } - return localSession; - } - - public synchronized void close() - { - if (localSession != null) - { - localSession.getCluster().close(); - localSession = null; - } - } -} diff --git a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java index 36dd1b7fb..067945bfe 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java +++ b/src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java @@ -28,7 +28,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.sidecar.CQLSession; +import org.apache.cassandra.sidecar.common.CQLSession; /** * Cassandra's real-time change data capture service.