Skip to content

Commit

Permalink
Upgrade/Downgrade test
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Jun 27, 2024
1 parent 2da4ee8 commit 6a4cff8
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 16 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@ jobs:
- name: Metrics
group: METRICS

- name: Upgrade
group: UPGRADE

steps:
- name: checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -648,6 +651,18 @@ jobs:
run: |
$GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-java-test-image
- name: Pull apachepulsar/pulsar:2.10.6
run: |
docker pull apachepulsar/pulsar:2.10.6
- name: Pull apachepulsar/pulsar:3.0.5
run: |
docker pull apachepulsar/pulsar:3.0.5
- name: Pull alpine:3.20.1
run: |
docker pull alpine:3.20.1
- name: Run setup commands
if: ${{ matrix.setup }}
run: |
Expand Down
4 changes: 4 additions & 0 deletions build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ test_group_standalone() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-standalone.xml -DintegrationTests
}

test_group_upgrade() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-upgrade.xml -DintegrationTests
}

test_group_transaction() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,28 @@ public class PulsarCluster {
* @return the built pulsar cluster
*/
public static PulsarCluster forSpec(PulsarClusterSpec spec) {
return forSpec(spec, Network.newNetwork());
}

public static PulsarCluster forSpec(PulsarClusterSpec spec, Network network) {
checkArgument(network != null, "Network should not be null");
CSContainer csContainer = null;
if (!spec.enableOxia) {
csContainer = new CSContainer(spec.clusterName)
.withNetwork(Network.newNetwork())
.withNetwork(network)
.withNetworkAliases(CSContainer.NAME);
}
return new PulsarCluster(spec, csContainer, false);
return new PulsarCluster(spec, network, csContainer, false);
}

public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) {
return new PulsarCluster(spec, csContainer, true);
return new PulsarCluster(spec, csContainer.getNetwork(), csContainer, true);
}

@Getter
private final PulsarClusterSpec spec;

public boolean closeNetworkOnExit = true;
@Getter
private final String clusterName;
private final Network network;
Expand All @@ -108,19 +114,18 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContai
private final String metadataStoreUrl;
private final String configurationMetadataStoreUrl;

private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {

private PulsarCluster(PulsarClusterSpec spec, Network network, CSContainer csContainer, boolean sharedCsContainer) {
this.spec = spec;
this.sharedCsContainer = sharedCsContainer;
this.clusterName = spec.clusterName();
if (csContainer != null ) {
if (network != null) {
this.network = network;
} else if (csContainer != null) {
this.network = csContainer.getNetwork();
} else {
this.network = Network.newNetwork();
}



if (spec.enableOxia) {
this.zkContainer = null;
this.oxiaContainer = new OxiaContainer(clusterName);
Expand Down Expand Up @@ -203,7 +208,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize))
.withEnv("ledgerDirectories", "data/bookkeeper/" + name + "/ledgers")
.withEnv("journalDirectory", "data/bookkeeper/" + name + "/journal");
if (spec.bookkeeperEnvs != null) {
bookieContainer.withEnv(spec.bookkeeperEnvs);
}
Expand Down Expand Up @@ -262,10 +269,27 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
}
));

if (spec.dataContainer != null) {
if (!sharedCsContainer && csContainer != null) {
csContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE);
}
if (zkContainer != null) {
zkContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE);
}
proxyContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE);

bookieContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
brokerContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
workerContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
}

spec.classPathVolumeMounts.forEach((key, value) -> {
if (zkContainer != null) {
zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
}
if (!sharedCsContainer && csContainer != null) {
csContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
}
proxyContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);

bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
Expand Down Expand Up @@ -323,6 +347,10 @@ public Map<String, GenericContainer<?>> getExternalServices() {
}

public void start() throws Exception {
start(true);
}

public void start(boolean doInit) throws Exception {

if (!spec.enableOxia) {
// start the local zookeeper
Expand All @@ -338,7 +366,7 @@ public void start() throws Exception {
oxiaContainer.start();
}

{
if (doInit) {
// Run cluster metadata initialization
@Cleanup
PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
Expand Down Expand Up @@ -453,10 +481,12 @@ public synchronized void stop() {
oxiaContainer.stop();
}

try {
network.close();
} catch (Exception e) {
log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
if (closeNetworkOnExit) {
try {
network.close();
} catch (Exception e) {
log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public class PulsarClusterSpec {
@Builder.Default
Map<String, String> classPathVolumeMounts = new TreeMap<>();

/**
* Data container
*/
@Builder.Default
GenericContainer<?> dataContainer = null;

/**
* Pulsar Test Image Name
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ protected void beforeStartCluster() throws Exception {
}

protected void setupCluster(PulsarClusterSpec spec) throws Exception {
setupCluster(spec, true);
}

protected void setupCluster(PulsarClusterSpec spec, boolean doInit) throws Exception {
incrementSetupNumber();
log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
Expand All @@ -150,7 +154,7 @@ protected void setupCluster(PulsarClusterSpec spec) throws Exception {

beforeStartCluster();

pulsarCluster.start();
pulsarCluster.start(doInit);

pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.upgrade;

import com.github.dockerjava.api.model.Bind;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testng.annotations.Test;
import java.util.stream.Stream;
import static java.util.stream.Collectors.joining;
import static org.testng.Assert.assertEquals;

/**
* Test upgrading/downgrading Pulsar cluster from major releases.
*/
@Slf4j
public class PulsarUpgradeDowngradeTest extends PulsarClusterTestBase {

@Test(timeOut=600_000)
public void upgradeFrom_2_10_6() throws Exception {
testUpgradeDowngrade("apachepulsar/pulsar:2.10.6", PulsarContainer.DEFAULT_IMAGE_NAME);
}

@Test(timeOut=600_000)
public void upgradeFrom_3_0_5() throws Exception {
testUpgradeDowngrade("apachepulsar/pulsar:3.0.5", PulsarContainer.DEFAULT_IMAGE_NAME);
}

private void testUpgradeDowngrade(String imageOld, String imageNew) throws Exception {
final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
.filter(s -> !s.isEmpty())
.collect(joining("-"));
String topicName = generateTopicName("testupdown", true);

@Cleanup
Network network = Network.newNetwork();
@Cleanup
GenericContainer<?> alpine = new GenericContainer<>("alpine:3.20.1")
.withExposedPorts(80)
.withNetwork(network)
.withNetworkAliases("shared-storage")
.withEnv("MAGIC_NUMBER", "42")
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
.getHostConfig()
.withBinds(Bind.parse("/pulsar/data:/pulsar/data")))
.withCommand("/bin/sh", "-c",
"mkdir -p /pulsar/data && "
+ "chmod -R ug+rwx /pulsar/data && "
+ "chown -R 10000:0 /pulsar/data && "
+ "rm -rf /pulsar/data/* && "
+ "while true; do echo \"$MAGIC_NUMBER\" | nc -l -p 80; done");
alpine.start();

PulsarClusterSpec specOld = PulsarClusterSpec.builder()
.numBookies(2)
.numBrokers(1)
.clusterName(clusterName)
.dataContainer(alpine)
.pulsarTestImage(imageOld)
.build();

PulsarClusterSpec specNew = PulsarClusterSpec.builder()
.numBookies(2)
.numBrokers(1)
.clusterName(clusterName)
.dataContainer(alpine)
.pulsarTestImage(imageNew)
.build();

log.info("Setting up OLD cluster {} with {} bookies, {} brokers using {}",
specOld.clusterName(), specOld.numBookies(), specOld.numBrokers(), imageOld);

pulsarCluster = PulsarCluster.forSpec(specNew, network);
pulsarCluster.closeNetworkOnExit = false;
pulsarCluster.start(true);

try {
log.info("setting retention");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-retention", "--size", "100M", "--time", "100m", "public/default");

publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 10);
} finally {
pulsarCluster.stop();
}

log.info("Upgrading to NEW cluster {} with {} bookies, {} brokers using {}",
specNew.clusterName(), specNew.numBookies(), specNew.numBrokers(), imageNew);

pulsarCluster = PulsarCluster.forSpec(specNew, network);
pulsarCluster.closeNetworkOnExit = false;
pulsarCluster.start(false);

try {
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 20);
} finally {
pulsarCluster.stop();
}

log.info("Downgrading to OLD cluster {} with {} bookies, {} brokers using {}",
specOld.clusterName(), specOld.numBookies(), specOld.numBrokers(), imageOld);

pulsarCluster = PulsarCluster.forSpec(specOld, network);
pulsarCluster.closeNetworkOnExit = false;
pulsarCluster.start(false);

try {
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 30);
} finally {
pulsarCluster.stop();
alpine.stop();
network.close();
}
}

private void publishAndConsume(String topicName, String serviceUrl, int numProduce, int numConsume) throws Exception {
log.info("publishAndConsume: topic name: {}", topicName);

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName)
.create();

log.info("Publishing {} messages", numProduce);
for (int i = numConsume - numProduce; i < numConsume; i++) {
log.info("Publishing message: {}", "smoke-message-" + i);
producer.send("smoke-message-" + i);
}

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
consumer.seek(MessageId.earliest);

log.info("Consuming {} messages", numConsume);
for (int i = 0; i < numConsume; i++) {
log.info("Waiting for message: {}", i);
Message<String> m = consumer.receive();
log.info("Received message: {}", m.getValue());
assertEquals("smoke-message-" + i, m.getValue());
}
}
}
2 changes: 1 addition & 1 deletion tests/integration/src/test/resources/pulsar-upgrade.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<suite name="Pulsar Upgrade Integration Tests" verbose="2" annotations="JDK">
<test name="pulsar-upgrade-test-suite" preserve-order="true" >
<classes>
<class name="org.apache.pulsar.tests.integration.upgrade.PulsarZKDowngradeTest" />
<class name="org.apache.pulsar.tests.integration.upgrade.PulsarUpgradeDowngradeTest" />
</classes>
</test>
</suite>

0 comments on commit 6a4cff8

Please sign in to comment.