-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(WIP) CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27 #18
base: trunk
Are you sure you want to change the base?
Changes from all commits
81a7030
0642027
b6eece2
a2ebd3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,3 +24,6 @@ sidecar: | |
|
||
healthcheck: | ||
- poll_freq_millis: 30000 | ||
|
||
cdc: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can change from "cdc" to "cassandra config file" for we may got some other useage of cassandra yaml path not only cdc . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and I think this config path should be put to "cassandra:" which of the top choice of the sidecar.yaml There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd wait for a general need and then refactor it out of the cdc section. We can do it now if there's such a need. |
||
- configPath: file:////etc/cassandra/conf/cassandra.yaml |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,8 +27,8 @@ | |
import com.google.inject.Inject; | ||
import com.google.inject.Singleton; | ||
import io.vertx.core.http.HttpServer; | ||
import org.apache.cassandra.sidecar.cdc.CDCReaderService; | ||
import org.apache.cassandra.sidecar.utils.SslUtils; | ||
|
||
/** | ||
* Main class for initiating the Cassandra sidecar | ||
*/ | ||
|
@@ -38,19 +38,22 @@ public class CassandraSidecarDaemon | |
private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class); | ||
private final HttpServer server; | ||
private final Configuration config; | ||
private final CDCReaderService cdcReaderService; | ||
|
||
@Inject | ||
public CassandraSidecarDaemon(HttpServer server, Configuration config) | ||
public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService) | ||
{ | ||
this.server = server; | ||
this.config = config; | ||
this.cdcReaderService = cdcReaderService; | ||
} | ||
|
||
public void start() | ||
{ | ||
banner(System.out); | ||
validate(); | ||
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort()); | ||
cdcReaderService.start(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a log for cdc reader service? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean a log saying the CDCReaderService started? There is such a log statement in that class: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Besides , do you think we should add a flag to enable or disable the cdc reader service? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion. Let me add that, so users who don't need this can just keep it disabled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and I also think we can add a common method where all other service can be added inner the method . |
||
server.listen(config.getPort(), config.getHost()); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,9 +41,13 @@ | |
import io.vertx.ext.web.handler.LoggerHandler; | ||
import io.vertx.ext.web.handler.StaticHandler; | ||
import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory; | ||
import org.apache.cassandra.sidecar.cdc.output.ConsoleOutput; | ||
import org.apache.cassandra.sidecar.cdc.output.Output; | ||
import org.apache.cassandra.sidecar.common.CQLSession; | ||
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate; | ||
import org.apache.cassandra.sidecar.common.CassandraVersionProvider; | ||
import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor; | ||
import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitorLogger; | ||
import org.apache.cassandra.sidecar.routes.HealthService; | ||
import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource; | ||
import org.jboss.resteasy.plugins.server.vertx.VertxRegistry; | ||
|
@@ -126,6 +130,14 @@ public Router vertxRouter(Vertx vertx) | |
return router; | ||
} | ||
|
||
@Override | ||
protected void configure() | ||
{ | ||
// TODO: Make the output type configurable | ||
bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class); | ||
bind(Output.class).to(ConsoleOutput.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we remove this method and like other variables such as HttpServer/Vertx/VertxRequestHandler/Router/Configuration use @provide to get the real value and the the out put can be configurable ; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not seeing an advantage to the request, @Maxwell-Guo. The approach @tharanga took is pretty standard for a Guice binding. |
||
} | ||
|
||
@Provides | ||
@Singleton | ||
public Configuration configuration() throws ConfigurationException, IOException | ||
|
@@ -151,6 +163,7 @@ public Configuration configuration() throws ConfigurationException, IOException | |
.setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null)) | ||
.setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null)) | ||
.setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false)) | ||
.setCassandraConfigPath(yamlConf.get(String.class, "cdc.configPath")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as I said before, the cassandra configre path can also be for cassandra choice at the sidecar.yaml。 |
||
.build(); | ||
} | ||
catch (MalformedURLException e) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.cassandra.sidecar.cdc; | ||
|
||
tharanga marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.FileOutputStream; | ||
import java.io.IOException; | ||
import java.nio.file.Paths; | ||
import java.util.Timer; | ||
import java.util.TimerTask; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import com.google.inject.Inject; | ||
import com.google.inject.Singleton; | ||
import org.apache.cassandra.config.DatabaseDescriptor; | ||
import org.apache.cassandra.db.commitlog.CommitLogDescriptor; | ||
import org.apache.cassandra.db.commitlog.CommitLogPosition; | ||
import org.apache.cassandra.io.util.DataInputPlus; | ||
import org.apache.cassandra.io.util.DataOutputBuffer; | ||
import org.apache.cassandra.io.util.DataOutputPlus; | ||
import org.apache.cassandra.sidecar.Configuration; | ||
|
||
/** | ||
* Manages the CDC reader bookmark. This tracks the last successfully processed offset | ||
* of a commit log. | ||
*/ | ||
@Singleton | ||
public class CDCBookmark extends TimerTask | ||
{ | ||
// Tracks last disk sync'd commit log position. | ||
private CommitLogPosition lastSyncedPosition; | ||
// Tracks last successfully processed commit log position by the CDC reader. | ||
private CommitLogPosition lastProcessedPosition; | ||
private final Timer timer; | ||
private static final String BOOKMARK = "CdcReader.bookmark"; | ||
private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class); | ||
private final ReentrantLock bookmarkLock = new ReentrantLock(); | ||
private final Configuration conf; | ||
|
||
@Inject | ||
CDCBookmark(Configuration conf) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Configuration conf is not used in the class, so I want to know if we will use in the future ?same with MutationHandler class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
{ | ||
this.lastSyncedPosition = null; | ||
this.lastProcessedPosition = null; | ||
this.conf = conf; | ||
this.timer = new Timer(); | ||
} | ||
|
||
/** | ||
* Persists last successfully processed commit log offset to the disk. | ||
*/ | ||
public void syncBookmark() | ||
{ | ||
CommitLogPosition lastPosition = this.getLastProcessedPosition(); | ||
|
||
if (lastPosition == null) | ||
{ | ||
return; | ||
} | ||
logger.debug("Last processed bookmark {}", this.lastProcessedPosition.toString()); | ||
try | ||
{ | ||
if (lastPosition.equals(this.lastSyncedPosition)) | ||
{ | ||
return; | ||
} | ||
|
||
CommitLogPosition.CommitLogPositionSerializer serializer = | ||
new CommitLogPosition.CommitLogPositionSerializer(); | ||
|
||
// TODO: JSON ser-de and write-rename instead of writing directly to the bookmark | ||
try (FileOutputStream fileOutputStream = new FileOutputStream( | ||
new File(this.getBookmarkPath()))) | ||
{ | ||
DataOutputPlus outBuffer = new DataOutputBuffer(); | ||
serializer.serialize(lastPosition, outBuffer); | ||
fileOutputStream.write(((DataOutputBuffer) outBuffer).getData()); | ||
fileOutputStream.flush(); | ||
this.lastSyncedPosition = lastPosition; | ||
logger.info("Successfully synced bookmark {} to the file {}", this.lastSyncedPosition.toString(), | ||
this.getBookmarkPath()); | ||
} | ||
catch (IOException e) | ||
{ | ||
logger.error("Error when writing bookmark {} to the file {}", lastPosition.toString(), | ||
this.getBookmarkPath()); | ||
} | ||
} | ||
catch (Exception ex) | ||
{ | ||
logger.error("Sync exception {}", ex.getMessage()); | ||
} | ||
} | ||
|
||
/** | ||
* Gets the path to the CDC reader bookmark. | ||
* | ||
* @return complete path to the bookmark file. | ||
*/ | ||
public String getBookmarkPath() | ||
{ | ||
return String.format("%s/%s", DatabaseDescriptor.getCDCLogLocation(), | ||
BOOKMARK); | ||
} | ||
|
||
@Override | ||
public void run() | ||
{ | ||
this.syncBookmark(); | ||
} | ||
|
||
/** | ||
* Gets the last successfully processed commit log offset. | ||
* This method is thread safe. | ||
* | ||
* @return last successfully processed commit log offset. | ||
*/ | ||
public CommitLogPosition getLastProcessedPosition() | ||
{ | ||
CommitLogPosition lastPosition = null; | ||
try | ||
{ | ||
bookmarkLock.lock(); | ||
if (this.lastProcessedPosition != null) | ||
{ | ||
lastPosition = new CommitLogPosition(this.lastProcessedPosition.segmentId, | ||
this.lastProcessedPosition.position); | ||
|
||
} | ||
} | ||
finally | ||
{ | ||
bookmarkLock.unlock(); | ||
} | ||
return lastPosition; | ||
} | ||
|
||
/** | ||
* Sets the last successfully processed commit log offset. | ||
* This method is thread safe. | ||
* | ||
*/ | ||
public void setLastProcessedPosition(CommitLogPosition processedPosition) | ||
{ | ||
try | ||
{ | ||
bookmarkLock.lock(); | ||
this.lastProcessedPosition = processedPosition; | ||
} | ||
finally | ||
{ | ||
bookmarkLock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Starts the background thread to write processed commit log positions to the disk. | ||
* */ | ||
public void startBookmarkSync() | ||
{ | ||
timer.schedule(this, 0, DatabaseDescriptor.getCommitLogSyncPeriod()); | ||
} | ||
|
||
/** | ||
* Gets the persisted commit log offset from the bookmark on the disk. | ||
* | ||
* @return persisted commit log offset. | ||
*/ | ||
public CommitLogPosition getPersistedBookmark() | ||
{ | ||
CommitLogPosition.CommitLogPositionSerializer serializer = | ||
new CommitLogPosition.CommitLogPositionSerializer(); | ||
try (FileInputStream fileInputStream = | ||
new FileInputStream(new File(this.getBookmarkPath()))) | ||
{ | ||
DataInputPlus inputBuffer = new DataInputPlus.DataInputStreamPlus(fileInputStream); | ||
return serializer.deserialize(inputBuffer); | ||
} | ||
catch (IOException ex) | ||
{ | ||
logger.error("Error when reading the saved bookmark {}", this.getBookmarkPath()); | ||
return null; | ||
} | ||
} | ||
|
||
/** | ||
* Checks whether there's a valid persisted bookmark. | ||
* | ||
* @return whether there's a valid bookmark. | ||
*/ | ||
public boolean isValidBookmark() | ||
{ | ||
CommitLogPosition bookmark = getPersistedBookmark(); | ||
if (bookmark == null) | ||
{ | ||
return false; | ||
} | ||
// It's fine for compression to be null as we are not using this CommitLogDescriptor to write commit logs. | ||
CommitLogDescriptor commitLogDescriptor = new CommitLogDescriptor(bookmark.segmentId, null, | ||
null); | ||
|
||
if (commitLogDescriptor == null || | ||
!Paths.get(DatabaseDescriptor.getCDCLogLocation(), | ||
commitLogDescriptor.cdcIndexFileName()).toFile().exists() || | ||
!Paths.get(DatabaseDescriptor.getCDCLogLocation(), | ||
commitLogDescriptor.fileName()).toFile().exists()) | ||
{ | ||
return false; | ||
} | ||
return true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take a look at the last commit I added, I spent a lot of time trying to decouple the sidecar from using a specific version of Cassandra. Each version we decide to support can (and will) have an adapter, allowing us to maintain a single sidecar project that can work with different versions of Cassandra each of which has different implementations. There's no assurance that C* 5.0 will have the same CDC implementation as the 4.0 version. Could you please move the version specific logic into the cassandra40 subproject?
In addition, we may want to have the user point to their cassandra lib directory as well in order to not ship every version of C* with the sidecar. That will give us the flexibility for folks to use their own builds (private or public) as well as ship a smaller artifact. Since everyone has to run Cassandra I think this is a fair ask. Using a
compileOnly
dependency would allow us to test against each version of C* without shipping the jars.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustyrazorblade +1 for both suggestions. I was thinking of punting this to a future commit, but I see the work you've done at
a4805a910904019698ae373ac33f88855cf67f3d
. Let me refactor this code to address both points.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustyrazorblade +1