Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27 #18

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ dependencies {

compile project(":common")
compile project(":cassandra40")
compile 'org.apache.cassandra:cassandra-all:4.0-beta2'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take a look at the last commit I added, I spent a lot of time trying to decouple the sidecar from using a specific version of Cassandra. Each version we decide to support can (and will) have an adapter, allowing us to maintain a single sidecar project that can work with different versions of Cassandra each of which has different implementations. There's no assurance that C* 5.0 will have the same CDC implementation as the 4.0 version. Could you please move the version specific logic into the cassandra40 subproject?

In addition, we may want to have the user point to their cassandra lib directory as well in order to not ship every version of C* with the sidecar. That will give us the flexibility for folks to use their own builds (private or public) as well as ship a smaller artifact. Since everyone has to run Cassandra I think this is a fair ask. Using a compileOnly dependency would allow us to test against each version of C* without shipping the jars.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rustyrazorblade +1 for both suggestions. I was thinking of punting this to a future commit, but I see the work you've done at a4805a910904019698ae373ac33f88855cf67f3d. Let me refactor this code to address both points.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

task copyCodeStyle(type: Copy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void start() throws ApiException, InterruptedException, CassandraPodExcep
}

started = namespacedPod.getStatus().getContainerStatuses().get(0).getStarted();
if (namespacedPod.getStatus().getContainerStatuses().get(0).getReady() && started) {
if (namespacedPod.getStatus().getContainerStatuses().get(0).getReady() && started)
{
logger.info("Pod startup OK");
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/dist/conf/sidecar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ sidecar:

healthcheck:
- poll_freq_millis: 30000

cdc:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can change from "cdc" to "cassandra config file" for we may got some other useage of cassandra yaml path not only cdc .

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I think this config path should be put to "cassandra:" which of the top choice of the sidecar.yaml

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd wait for a general need and then refactor it out of the cdc section. We can do it now if there's such a need.

- configPath: file:////etc/cassandra/conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.http.HttpServer;
import org.apache.cassandra.sidecar.cdc.CDCReaderService;
import org.apache.cassandra.sidecar.utils.SslUtils;

/**
* Main class for initiating the Cassandra sidecar
*/
Expand All @@ -38,19 +38,22 @@ public class CassandraSidecarDaemon
private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
private final HttpServer server;
private final Configuration config;
private final CDCReaderService cdcReaderService;

@Inject
public CassandraSidecarDaemon(HttpServer server, Configuration config)
public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
{
this.server = server;
this.config = config;
this.cdcReaderService = cdcReaderService;
}

public void start()
{
banner(System.out);
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
cdcReaderService.start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log for cdc reader service?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a log saying the CDCReaderService started? There is such a log statement in that class: logger.info("Successfully started the CDC reader");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides , do you think we should add a flag to enable or disable the cdc reader service?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Let me add that, so users who don't need this can just keep it disabled.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I also think we can add a common method where all other service can be added inner the method .
such as startInitService() , for me ,I think cdcReaderService is a sidecar init start service and we can use a flag to
show the service's start or not .

server.listen(config.getPort(), config.getHost());
}

Expand Down
28 changes: 26 additions & 2 deletions src/main/java/org/apache/cassandra/sidecar/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ public class Configuration

private final boolean isSslEnabled;

/* Cassandra server conf path */
@Nullable
private String cassandraConfigPath;
tharanga marked this conversation as resolved.
Show resolved Hide resolved

public Configuration(String cassandraHost, Integer cassandraPort, String host, Integer port,
Integer healthCheckFrequencyMillis, boolean isSslEnabled,
@Nullable String keyStorePath,
@Nullable String keyStorePassword,
@Nullable String trustStorePath,
@Nullable String trustStorePassword)
@Nullable String trustStorePassword,
@Nullable String cassandraConfigPath)
{
this.cassandraHost = cassandraHost;
this.cassandraPort = cassandraPort;
Expand All @@ -73,6 +78,8 @@ public Configuration(String cassandraHost, Integer cassandraPort, String host, I
this.trustStorePath = trustStorePath;
this.trustStorePassword = trustStorePassword;
this.isSslEnabled = isSslEnabled;

this.cassandraConfigPath = cassandraConfigPath;
}

/**
Expand Down Expand Up @@ -179,6 +186,15 @@ public String getTruststorePassword()
return trustStorePassword;
}

/**
* Get path of the Cassandra configuration file
*/
@Nullable
public String getCassandraConfigPath()
{
return cassandraConfigPath;
}

/**
* Configuration Builder
*/
Expand All @@ -194,6 +210,7 @@ public static class Builder
private String trustStorePath;
private String trustStorePassword;
private boolean isSslEnabled;
private String cassandraConfigPath;

public Builder setCassandraHost(String host)
{
Expand Down Expand Up @@ -255,10 +272,17 @@ public Builder setSslEnabled(boolean enabled)
return this;
}

public Builder setCassandraConfigPath(String configPath)
{
this.cassandraConfigPath = configPath;
return this;
}

public Configuration build()
{
return new Configuration(cassandraHost, cassandraPort, host, port, healthCheckFrequencyMillis, isSslEnabled,
keyStorePath, keyStorePassword, trustStorePath, trustStorePassword);
keyStorePath, keyStorePassword, trustStorePath, trustStorePassword,
cassandraConfigPath);
}
}
}
13 changes: 13 additions & 0 deletions src/main/java/org/apache/cassandra/sidecar/MainModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@
import io.vertx.ext.web.handler.LoggerHandler;
import io.vertx.ext.web.handler.StaticHandler;
import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory;
import org.apache.cassandra.sidecar.cdc.output.ConsoleOutput;
import org.apache.cassandra.sidecar.cdc.output.Output;
import org.apache.cassandra.sidecar.common.CQLSession;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor;
import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitorLogger;
import org.apache.cassandra.sidecar.routes.HealthService;
import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource;
import org.jboss.resteasy.plugins.server.vertx.VertxRegistry;
Expand Down Expand Up @@ -126,6 +130,14 @@ public Router vertxRouter(Vertx vertx)
return router;
}

@Override
protected void configure()
{
// TODO: Make the output type configurable
bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class);
bind(Output.class).to(ConsoleOutput.class);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this method and like other variables such as HttpServer/Vertx/VertxRequestHandler/Router/Configuration use @provide to get the real value and the the out put can be configurable ;
that is my code;
@provides
@singleton
public Output outPut(Configuration conf)
{
String outPutClass = conf.getOutPut();
if (!outPutClass.contains("."))
outPutClass = "org.apache.cassandra.sidecar.common.output." + outPutClass; // I move the out put to common dir
Output output = FBUtilities.construct(outPutClass, "output");
return output;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output is an interface. Let me see whether there's a benefit to refactoring it like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing an advantage to the request, @Maxwell-Guo. The approach @tharanga took is pretty standard for a Guice binding.

}

@Provides
@Singleton
public Configuration configuration() throws ConfigurationException, IOException
Expand All @@ -151,6 +163,7 @@ public Configuration configuration() throws ConfigurationException, IOException
.setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null))
.setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null))
.setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false))
.setCassandraConfigPath(yamlConf.get(String.class, "cdc.configPath"))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I said before, the cassandra configre path can also be for cassandra choice at the sidecar.yaml。

.build();
}
catch (MalformedURLException e)
Expand Down
231 changes: 231 additions & 0 deletions src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.cdc;

tharanga marked this conversation as resolved.
Show resolved Hide resolved

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.sidecar.Configuration;

/**
* Manages the CDC reader bookmark. This tracks the last successfully processed offset
* of a commit log.
*/
@Singleton
public class CDCBookmark extends TimerTask
{
// Tracks last disk sync'd commit log position.
private CommitLogPosition lastSyncedPosition;
// Tracks last successfully processed commit log position by the CDC reader.
private CommitLogPosition lastProcessedPosition;
private final Timer timer;
private static final String BOOKMARK = "CdcReader.bookmark";
private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class);
private final ReentrantLock bookmarkLock = new ReentrantLock();
private final Configuration conf;

@Inject
CDCBookmark(Configuration conf)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration conf is not used in the class, so I want to know if we will use in the future ?same with MutationHandler class

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

{
this.lastSyncedPosition = null;
this.lastProcessedPosition = null;
this.conf = conf;
this.timer = new Timer();
}

/**
* Persists last successfully processed commit log offset to the disk.
*/
public void syncBookmark()
{
CommitLogPosition lastPosition = this.getLastProcessedPosition();

if (lastPosition == null)
{
return;
}
logger.debug("Last processed bookmark {}", this.lastProcessedPosition.toString());
try
{
if (lastPosition.equals(this.lastSyncedPosition))
{
return;
}

CommitLogPosition.CommitLogPositionSerializer serializer =
new CommitLogPosition.CommitLogPositionSerializer();

// TODO: JSON ser-de and write-rename instead of writing directly to the bookmark
try (FileOutputStream fileOutputStream = new FileOutputStream(
new File(this.getBookmarkPath())))
{
DataOutputPlus outBuffer = new DataOutputBuffer();
serializer.serialize(lastPosition, outBuffer);
fileOutputStream.write(((DataOutputBuffer) outBuffer).getData());
fileOutputStream.flush();
this.lastSyncedPosition = lastPosition;
logger.info("Successfully synced bookmark {} to the file {}", this.lastSyncedPosition.toString(),
this.getBookmarkPath());
}
catch (IOException e)
{
logger.error("Error when writing bookmark {} to the file {}", lastPosition.toString(),
this.getBookmarkPath());
}
}
catch (Exception ex)
{
logger.error("Sync exception {}", ex.getMessage());
}
}

/**
* Gets the path to the CDC reader bookmark.
*
* @return complete path to the bookmark file.
*/
public String getBookmarkPath()
{
return String.format("%s/%s", DatabaseDescriptor.getCDCLogLocation(),
BOOKMARK);
}

@Override
public void run()
{
this.syncBookmark();
}

/**
* Gets the last successfully processed commit log offset.
* This method is thread safe.
*
* @return last successfully processed commit log offset.
*/
public CommitLogPosition getLastProcessedPosition()
{
CommitLogPosition lastPosition = null;
try
{
bookmarkLock.lock();
if (this.lastProcessedPosition != null)
{
lastPosition = new CommitLogPosition(this.lastProcessedPosition.segmentId,
this.lastProcessedPosition.position);

}
}
finally
{
bookmarkLock.unlock();
}
return lastPosition;
}

/**
* Sets the last successfully processed commit log offset.
* This method is thread safe.
*
*/
public void setLastProcessedPosition(CommitLogPosition processedPosition)
{
try
{
bookmarkLock.lock();
this.lastProcessedPosition = processedPosition;
}
finally
{
bookmarkLock.unlock();
}
}

/**
* Starts the background thread to write processed commit log positions to the disk.
* */
public void startBookmarkSync()
{
timer.schedule(this, 0, DatabaseDescriptor.getCommitLogSyncPeriod());
}

/**
* Gets the persisted commit log offset from the bookmark on the disk.
*
* @return persisted commit log offset.
*/
public CommitLogPosition getPersistedBookmark()
{
CommitLogPosition.CommitLogPositionSerializer serializer =
new CommitLogPosition.CommitLogPositionSerializer();
try (FileInputStream fileInputStream =
new FileInputStream(new File(this.getBookmarkPath())))
{
DataInputPlus inputBuffer = new DataInputPlus.DataInputStreamPlus(fileInputStream);
return serializer.deserialize(inputBuffer);
}
catch (IOException ex)
{
logger.error("Error when reading the saved bookmark {}", this.getBookmarkPath());
return null;
}
}

/**
* Checks whether there's a valid persisted bookmark.
*
* @return whether there's a valid bookmark.
*/
public boolean isValidBookmark()
{
CommitLogPosition bookmark = getPersistedBookmark();
if (bookmark == null)
{
return false;
}
// It's fine for compression to be null as we are not using this CommitLogDescriptor to write commit logs.
CommitLogDescriptor commitLogDescriptor = new CommitLogDescriptor(bookmark.segmentId, null,
null);

if (commitLogDescriptor == null ||
!Paths.get(DatabaseDescriptor.getCDCLogLocation(),
commitLogDescriptor.cdcIndexFileName()).toFile().exists() ||
!Paths.get(DatabaseDescriptor.getCDCLogLocation(),
commitLogDescriptor.fileName()).toFile().exists())
{
return false;
}
return true;
}
}
Loading