Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PscMetadataClient #44

Merged
merged 7 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions psc/src/main/java/com/pinterest/psc/common/TopicRn.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.pinterest.psc.logging.PscLogger;

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -111,6 +112,10 @@ public String getTopicRnPrefixString() {
return topicRnPrefixString;
}

public String getStandard() {
return standard;
}

public String getService() {
return service;
}
Expand Down Expand Up @@ -167,6 +172,22 @@ private static String upgradeTopicRnToCurrentVersion(String topicRnAsStr, byte s
throw new TopicRnSyntaxException(String.format("Unsupported topic RN version %d", serializedVersion));
}

@Override
public int hashCode() {
return Objects.hash(
topicRnString,
topicRnPrefixString,
standard,
service,
environment,
cloud,
region,
classifier,
cluster,
topic
);
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public TopicUriPartition(String topicUriStr, int partition) {
}
}

protected TopicUriPartition(TopicUri topicUri, int partition) {
public TopicUriPartition(TopicUri topicUri, int partition) {
this.backendTopicUri = topicUri;
this.topicUriStr = topicUri.getTopicUriAsString();
this.partition = partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ public class PscConfiguration extends PropertiesConfiguration {
public static final String PSC_PRODUCER_SSL_TRUSTSTORE_TYPE = PSC_PRODUCER + "." + SSL_TRUSTSTORE_TYPE;
*/

// **********************
// MetadataClient Configuration
// **********************

protected static final String PSC_METADATA = "psc.metadata";
public static final String PSC_METADATA_CLIENT_ID = PSC_METADATA + "." + CLIENT_ID;

// **********************
// Metrics Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class PscConfigurationInternal {
private final static String PSC_CLIENT_TYPE = "psc.client.type";
public final static String PSC_CLIENT_TYPE_CONSUMER = "consumer";
public final static String PSC_CLIENT_TYPE_PRODUCER = "producer";
private final static String[] PSC_VALID_CLIENT_TYPES = {PSC_CLIENT_TYPE_CONSUMER, PSC_CLIENT_TYPE_PRODUCER};
public final static String PSC_CLIENT_TYPE_METADATA = "metadata";
private final static String[] PSC_VALID_CLIENT_TYPES = {PSC_CLIENT_TYPE_CONSUMER, PSC_CLIENT_TYPE_PRODUCER, PSC_CLIENT_TYPE_METADATA};

private PscConfiguration pscConfiguration;
private Deserializer keyDeserializer, valueDeserializer;
Expand Down Expand Up @@ -140,6 +141,9 @@ protected void validate(boolean isLenient, boolean isLogConfiguration) throws Co
case PSC_CLIENT_TYPE_PRODUCER:
validateProducerConfiguration(isLenient, isLogConfiguration);
break;
case PSC_CLIENT_TYPE_METADATA:
validateMetadataClientConfiguration(isLenient, isLogConfiguration);
break;
default:
throw new ConfigurationException("Valid client type expected: " + String.join(", ", PSC_VALID_CLIENT_TYPES));
}
Expand Down Expand Up @@ -483,6 +487,24 @@ private <T> T verifyConfigHasValue(
return configuration.get(expectedType, configKey);
}

private void validateMetadataClientConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException {
PscConfiguration metadataConfiguration = new PscConfiguration();
metadataConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_METADATA));
Map<String, Exception> invalidConfigs = new HashMap<>();
verifyConfigHasValue(metadataConfiguration, PscConfiguration.CLIENT_ID, String.class, invalidConfigs);
if (isLogConfiguration)
logConfiguration();

if (invalidConfigs.isEmpty() || isLenient)
return;

StringBuilder stringBuilder = new StringBuilder();
invalidConfigs.forEach((error, exception) ->
stringBuilder.append(String.format("\t%s: %s\n", error, exception == null ? "" : exception.getMessage()))
);
throw new ConfigurationException("Invalid metadataClient configuration\n" + stringBuilder.toString());
}

private void validateProducerConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException {
PscConfiguration producerConfiguration = new PscConfiguration();
producerConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_PRODUCER));
Expand Down Expand Up @@ -749,4 +771,8 @@ public int getAutoResolutionRetryCount() {
public MetricsReporterConfiguration getMetricsReporterConfiguration() {
return metricsReporterConfiguration;
}

public String getMetadataClientId() {
return pscConfiguration.getString(PscConfiguration.PSC_METADATA_CLIENT_ID);
}
}
27 changes: 27 additions & 0 deletions psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.pinterest.psc.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;

/**
* Utility class for common metadata logic
*/
public class MetadataUtils {

@VisibleForTesting
public static TopicRn createTopicRn(TopicUri topicUri, String topicName) {
return new TopicRn(
topicUri.getTopicRn().getTopicRnPrefixString() + topicName,
topicUri.getTopicRn().getTopicRnPrefixString(),
topicUri.getTopicRn().getStandard(),
topicUri.getTopicRn().getService(),
topicUri.getTopicRn().getEnvironment(),
topicUri.getTopicRn().getCloud(),
topicUri.getTopicRn().getRegion(),
topicUri.getTopicRn().getClassifier(),
topicUri.getTopicRn().getCluster(),
topicName
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.pinterest.psc.metadata;

import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUriPartition;

import java.util.List;

/**
* Metadata for a {@link TopicRn}, including the list of its partitions
*/
public class TopicRnMetadata {

private final TopicRn topicRn;
private final List<TopicUriPartition> topicUriPartitions;

public TopicRnMetadata(TopicRn topicRn, List<TopicUriPartition> topicUriPartitions) {
this.topicRn = topicRn;
this.topicUriPartitions = topicUriPartitions;
}

public TopicRn getTopicRn() {
return topicRn;
}

public List<TopicUriPartition> getTopicUriPartitions() {
return topicUriPartitions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.pinterest.psc.metadata.client;

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.ServiceDiscoveryConfig;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.discovery.ServiceDiscoveryManager;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.metadata.TopicRnMetadata;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An abstract class that defines the interface for metadata queries and operations. Specific implementations
* of this class should be created for each backend, such as Kafka, MemQ, etc.
*/
public abstract class PscBackendMetadataClient implements AutoCloseable {

protected TopicUri topicUri;
protected PscConfigurationInternal pscConfigurationInternal;
protected ServiceDiscoveryConfig discoveryConfig;

public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException {
this.topicUri = topicUri;
this.pscConfigurationInternal = pscConfigurationInternal;
this.discoveryConfig =
ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri);
}

public abstract List<TopicRn> listTopicRns(
long timeout,
TimeUnit timeUnit
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicRn, TopicRnMetadata> describeTopicRns(
Collection<TopicRn> topicRns,
long timeout,
TimeUnit timeUnit
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicUriPartition, MessageId> listOffsets(
Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions,
long timeout,
TimeUnit timeUnit
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicUriPartition, MessageId> listOffsetsForConsumerGroup(
String consumerGroupId,
Collection<TopicUriPartition> topicUriPartitions,
long timeout,
TimeUnit timeUnit
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract void close() throws Exception;

@VisibleForTesting
protected TopicUri getTopicUri() {
return topicUri;
}
}
Loading
Loading