Skip to content

Commit

Permalink
Revert "Make Pulsar connections work on non-default public/default te…
Browse files Browse the repository at this point in the history
…nant and namespace (LangStream#13)"

This reverts commit c15c8c4.
  • Loading branch information
nicoloboschi committed May 23, 2024
1 parent b9907b2 commit 9e621df
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ private void processRecord(Record record, RecordSink recordSink) {
MutableRecord context = MutableRecord.recordToMutableRecord(record, true);
final JsonRecord jsonRecord = context.toJsonRecord();

log.debug("Processing JSON record {}", jsonRecord.toString());
String fileName = objectTemplate.execute(jsonRecord);

log.info("Processing file {}", fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,15 @@ void testProcess() throws Exception {
for (int i = 0; i < 2; i++) {
String s = content + i;
minioClient.putObject(
PutObjectArgs.builder().bucket(bucket).object("test'-" + i + ".txt").stream(
PutObjectArgs.builder().bucket(bucket).object("test-" + i + ".txt").stream(
new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8)),
s.length(),
-1)
.build());
}
// List objects from the bucket
Iterable<Result<Item>> results =
minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).build());

for (Result<Item> result : results) {
Item item = result.get();
// Display the object name in the logs
System.out.println(item.objectName());
}

// Create a input record that specifies the first file
String objectName = "test'-0.txt";
String objectName = "test-0.txt";
SimpleRecord someRecord =
SimpleRecord.builder()
.value("{\"objectName\": \"" + objectName + "\"}")
Expand Down Expand Up @@ -161,7 +152,7 @@ void testProcess() throws Exception {
foundOrigHeader.isPresent()); // Check that the object name is passed in the record

// Get the next file
String secondObjectName = "test'-1.txt";
String secondObjectName = "test-1.txt";
someRecord =
SimpleRecord.builder()
.value("{\"objectName\": \"" + secondObjectName + "\"}")
Expand Down Expand Up @@ -400,7 +391,7 @@ private AgentProcessor buildAgentProcessor(String bucket) throws Exception {
String endpoint = localstack.getEndpointOverride(S3).toString();
configs.put("endpoint", endpoint);
configs.put("bucketName", bucket);
configs.put("objectName", "{{{ value.objectName }}}");
configs.put("objectName", "{{ value.objectName }}");
agent.init(configs);
AgentContext context = mock(AgentContext.class);
when(context.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected StreamingCluster getStreamingCluster() {
"service",
Map.of("serviceUrl", pulsarContainer.getBrokerUrl()),
"default-tenant",
"staging",
"public",
"default-namespace",
"default"));
}
Expand All @@ -61,7 +61,7 @@ public ApplicationStore store() {
serviceUrl: "%s"
service:
serviceUrl: "%s"
default-tenant: "staging"
default-tenant: "public"
default-namespace: "default"
computeCluster:
type: "none"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import ai.langstream.api.storage.ApplicationStore;
import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
Expand All @@ -40,23 +39,11 @@ protected StreamingCluster getStreamingCluster() {
"service",
Map.of("serviceUrl", pulsarContainer.getBrokerUrl()),
"default-tenant",
"staging",
"public",
"default-namespace",
"default"));
}

@Override
@Test
public void testSimpleProduceConsume() throws Exception {
super.testSimpleProduceConsume();
}

@Override
@Test
public void testSendEvents() throws Exception {
super.testSendEvents();
}

@TestConfiguration
public static class WebSocketTestConfig {

Expand All @@ -73,7 +60,7 @@ public ApplicationStore store() {
serviceUrl: "%s"
service:
serviceUrl: "%s"
default-tenant: "staging"
default-tenant: "public"
default-namespace: "default"
computeCluster:
type: "none"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -121,18 +120,7 @@ public TopicReader createReader(
Map<String, Object> configuration,
TopicOffsetPosition initialPosition) {
Map<String, Object> copy = new HashMap<>(configuration);
// Get tenant/namespace from streamingCluster configuration or default to public/default
String tenant = "public";
String namespace = "default";
// Log the streamingCluster configuration
log.info("Streaming cluster configuration: {}", streamingCluster.configuration());
if (streamingCluster.configuration() != null) {
Map<String, Object> clusterConfiguration = streamingCluster.configuration();
tenant = (String) clusterConfiguration.getOrDefault("default-tenant", tenant);
namespace =
(String) clusterConfiguration.getOrDefault("default-namespace", namespace);
}
return new PulsarTopicReader(copy, tenant, namespace, initialPosition);
return new PulsarTopicReader(copy, initialPosition);
}

@Override
Expand All @@ -142,16 +130,7 @@ public TopicConsumer createConsumer(
Map<String, Object> configuration) {
Map<String, Object> copy = new HashMap<>(configuration);
copy.remove("deadLetterTopicProducer");
// Get tenant/namespace from streamingCluster configuration or default to public/default
String tenant = "public";
String namespace = "default";
if (streamingCluster.configuration() != null) {
Map<String, Object> clusterConfiguration = streamingCluster.configuration();
tenant = (String) clusterConfiguration.getOrDefault("default-tenant", tenant);
namespace =
(String) clusterConfiguration.getOrDefault("default-namespace", namespace);
}
return new PulsarTopicConsumer(copy, tenant, namespace);
return new PulsarTopicConsumer(copy);
}

@Override
Expand All @@ -160,16 +139,7 @@ public TopicProducer createProducer(
StreamingCluster streamingCluster,
Map<String, Object> configuration) {
Map<String, Object> copy = new HashMap<>(configuration);
// Get tenant/namespace from streamingCluster configuration or default to public/default
String tenant = "public";
String namespace = "default";
if (streamingCluster.configuration() != null) {
Map<String, Object> clusterConfiguration = streamingCluster.configuration();
tenant = (String) clusterConfiguration.getOrDefault("default-tenant", tenant);
namespace =
(String) clusterConfiguration.getOrDefault("default-namespace", namespace);
}
return new PulsarTopicProducer<>(copy, tenant, namespace);
return new PulsarTopicProducer<>(copy);
}

@Override
Expand Down Expand Up @@ -251,18 +221,6 @@ private static void deployTopic(PulsarAdmin admin, PulsarTopic topic)
+ topic.name().namespace()
+ "/"
+ topic.name().name();
// Check if tenant exists
if (!admin.tenants().getTenants().contains(topic.name().tenant())) {
log.info("Creating tenant {}", topic.name().tenant());
// Clone tenant roles and allowed clusters from the public tenant
TenantInfo publicTenantInfo = admin.tenants().getTenantInfo("public");
admin.tenants().createTenant(topic.name().tenant(), publicTenantInfo);
}
// Check if namespace exists
if (!admin.namespaces().getNamespaces(topic.name().tenant()).contains(namespace)) {
log.info("Creating namespace {}", namespace);
admin.namespaces().createNamespace(namespace);
}
log.info("Listing topics in namespace {}", namespace);
List<String> existing;
if (topic.partitions() <= 0) {
Expand Down Expand Up @@ -468,17 +426,9 @@ private class PulsarTopicReader implements TopicReader {

private Reader<GenericRecord> reader;

private String tenant;
private String namespace;

private PulsarTopicReader(
Map<String, Object> configuration,
String tenant,
String namespace,
TopicOffsetPosition initialPosition) {
Map<String, Object> configuration, TopicOffsetPosition initialPosition) {
this.configuration = configuration;
this.tenant = tenant;
this.namespace = namespace;
this.startMessageId =
switch (initialPosition.position()) {
case Earliest -> MessageId.earliest;
Expand All @@ -497,13 +447,10 @@ private PulsarTopicReader(

@Override
public void start() throws Exception {
// Add tenant and namespace to get the fully qualified topic name
String topic = (String) configuration.remove("topic");
String fullyQualifiedTopic = "%s/%s/%s".formatted(tenant, namespace, topic);
log.info("Starting reader for topic {}", fullyQualifiedTopic);
reader =
client.newReader(Schema.AUTO_CONSUME())
.topic(fullyQualifiedTopic)
.topic(topic)
.startMessageId(this.startMessageId)
.loadConf(configuration)
.receiverQueueSize(5) // To limit the number of messages in flight
Expand Down Expand Up @@ -573,14 +520,9 @@ private class PulsarTopicConsumer implements TopicConsumer {
Consumer<GenericRecord> consumer;

private final AtomicLong totalOut = new AtomicLong();
private String tenant;
private String namespace;

public PulsarTopicConsumer(
Map<String, Object> configuration, String tenant, String namespace) {
public PulsarTopicConsumer(Map<String, Object> configuration) {
this.configuration = configuration;
this.tenant = tenant;
this.namespace = namespace;
}

@Override
Expand All @@ -591,15 +533,12 @@ public Object getNativeConsumer() {
@Override
public void start() throws Exception {
String topic = (String) configuration.remove("topic");
// Add tenant and namespace to get the fully qualified topic name
String fullyQualifiedTopic = "%s/%s/%s".formatted(tenant, namespace, topic);
log.info("Starting consumer for topic {}", fullyQualifiedTopic);
consumer =
client.newConsumer(Schema.AUTO_CONSUME())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Failover)
.loadConf(configuration)
.topic(fullyQualifiedTopic)
.topic(topic)
.receiverQueueSize(5) // To limit the number of messages in flight
.subscribe();
}
Expand Down Expand Up @@ -673,22 +612,14 @@ private class PulsarTopicProducer<K> implements TopicProducer {
entry(Instant.class, Schema.INSTANT),
entry(ByteBuffer.class, Schema.BYTEBUFFER));

public PulsarTopicProducer(
Map<String, Object> configuration, String tenant, String namespace) {
public PulsarTopicProducer(Map<String, Object> configuration) {
this.configuration = configuration;
this.tenant = tenant;
this.namespace = namespace;
}

@Override
@SneakyThrows
public void start() {
String baseTopic = (String) configuration.remove("topic");
// Add tenant and namespace to get the fully qualified topic name
topic = "%s/%s/%s".formatted(tenant, namespace, baseTopic);
log.info("Starting producer for topic {}", topic);
// If the producer has a specified schema, we intialize it here
// Otherwise, we will infer the schema from the first message in the write method
topic = (String) configuration.remove("topic");
if (configuration.containsKey("valueSchema")) {
SchemaDefinition valueSchemaDefinition =
mapper.convertValue(
Expand Down

0 comments on commit 9e621df

Please sign in to comment.