Skip to content

Commit

Permalink
[fix][broker] Fixed the ExtensibleLoadManagerImpl internal system get…
Browse files Browse the repository at this point in the history
…Topic failure when the leadership changes apache#21764 (apache#21801)
  • Loading branch information
heesung-sn committed Jan 4, 2024
1 parent d46846a commit 7632083
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,18 @@ public static void createSystemTopic(PulsarService pulsar, String topic) throws
log.info("Created topic {}.", topic);
} catch (PulsarAdminException.ConflictException ex) {
if (debug(pulsar.getConfiguration(), log)) {
log.info("Topic {} already exists.", topic, ex);
log.info("Topic {} already exists.", topic);
}
} catch (PulsarAdminException e) {
throw new PulsarServerException(e);
}
}

private static void createSystemTopics(PulsarService pulsar) throws PulsarServerException {
createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
Expand Down Expand Up @@ -321,13 +326,9 @@ public void start() throws PulsarServerException {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.brokerLoadDataStore.startTableView();
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
Expand Down Expand Up @@ -382,7 +383,6 @@ public void start() throws PulsarServerException {
this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
this.splitScheduler.start();
Expand Down Expand Up @@ -740,74 +740,74 @@ public static boolean isInternalTopic(String topic) {

@VisibleForTesting
void playLeader() {
if (role != Leader) {
log.info("This broker:{} is changing the role from {} to {}",
pulsar.getLookupServiceAddress(), role, Leader);
int retry = 0;
while (true) {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getLookupServiceAddress(), role, Leader);
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
initWaiter.await();
// Confirm the system topics have been created or create them if they do not exist.
// If the leader has changed, the new leader need to reset
// the local brokerService.topics (by this topic creations).
// Otherwise, the system topic existence check will fail on the leader broker.
createSystemTopics(pulsar);
brokerLoadDataStore.init();
topBundlesLoadDataStore.init();
unloadScheduler.start();
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getLookupServiceAddress(), ++retry, e);
try {
initWaiter.await();
serviceUnitStateChannel.scheduleOwnershipMonitor();
topBundlesLoadDataStore.startTableView();
unloadScheduler.start();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to change the role. Retrying {} th ...",
pulsar.getLookupServiceAddress(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
log.warn("Interrupted while sleeping.");
}
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
log.warn("Interrupted while sleeping.");
// preserve thread's interrupt status
Thread.currentThread().interrupt();
}
}
role = Leader;
log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());
}
role = Leader;
log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());

// flush the load data when the leader is elected.
if (brokerLoadDataReporter != null) {
brokerLoadDataReporter.reportAsync(true);
}
if (topBundleLoadDataReporter != null) {
topBundleLoadDataReporter.reportAsync(true);
}
brokerLoadDataReporter.reportAsync(true);
topBundleLoadDataReporter.reportAsync(true);
}

@VisibleForTesting
void playFollower() {
if (role != Follower) {
log.info("This broker:{} is changing the role from {} to {}",
pulsar.getLookupServiceAddress(), role, Follower);
int retry = 0;
while (true) {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getLookupServiceAddress(), role, Follower);
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
initWaiter.await();
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
brokerLoadDataStore.init();
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getLookupServiceAddress(), ++retry, e);
try {
initWaiter.await();
serviceUnitStateChannel.cancelOwnershipMonitor();
topBundlesLoadDataStore.closeTableView();
unloadScheduler.close();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to change the role. Retrying {} th ...",
pulsar.getLookupServiceAddress(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
log.warn("Interrupted while sleeping.");
}
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
log.warn("Interrupted while sleeping.");
// preserve thread's interrupt status
Thread.currentThread().interrupt();
}
}
role = Follower;
log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());
}
role = Follower;
log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());

// flush the load data when the leader is elected.
if (brokerLoadDataReporter != null) {
brokerLoadDataReporter.reportAsync(true);
}
if (topBundleLoadDataReporter != null) {
topBundleLoadDataReporter.reportAsync(true);
}
brokerLoadDataReporter.reportAsync(true);
topBundleLoadDataReporter.reportAsync(true);
}

public List<Metrics> getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,26 @@ public interface LoadDataStore<T> extends Closeable {
*/
void closeTableView() throws IOException;


/**
* Starts the data store (both producer and table view).
*/
void start() throws LoadDataStoreException;

/**
* Inits the data store (close and start the data store).
*/
void init() throws IOException;

/**
* Starts the table view.
*/
void startTableView() throws LoadDataStoreException;


/**
* Starts the producer.
*/
void startProducer() throws LoadDataStoreException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
*/
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {

private TableView<T> tableView;
private volatile TableView<T> tableView;

private final Producer<T> producer;
private volatile Producer<T> producer;

private final PulsarClient client;

Expand All @@ -50,7 +50,6 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class<T> clazz) throws LoadDataStoreException {
try {
this.client = client;
this.producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
this.topic = topic;
this.clazz = clazz;
} catch (Exception e) {
Expand Down Expand Up @@ -99,6 +98,12 @@ public void closeTableView() throws IOException {
}
}

@Override
public void start() throws LoadDataStoreException {
startProducer();
startTableView();
}

@Override
public void startTableView() throws LoadDataStoreException {
if (tableView == null) {
Expand All @@ -111,14 +116,33 @@ public void startTableView() throws LoadDataStoreException {
}
}

@Override
public void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
} catch (PulsarClientException e) {
producer = null;
throw new LoadDataStoreException(e);
}
}
}

@Override
public void close() throws IOException {
if (producer != null) {
producer.close();
producer = null;
}
closeTableView();
}

@Override
public void init() throws IOException {
close();
start();
}

private void validateTableViewStart() {
if (tableView == null) {
throw new IllegalStateException("table view has not been started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,12 +802,12 @@ public void testRoleChange() throws Exception {
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
primaryLoadManager.playFollower();
primaryLoadManager.playFollower();
primaryLoadManager.playFollower(); // close 3 times
primaryLoadManager.playFollower(); // close 1 time
secondaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
primaryLoadManager.playLeader();
primaryLoadManager.playLeader();
primaryLoadManager.playLeader(); // close 3 times and open 3 times
primaryLoadManager.playLeader(); // close 1 time and open 1 time,
secondaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
} else {
Expand All @@ -822,10 +822,10 @@ public void testRoleChange() throws Exception {
}


verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView();

FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,25 @@ public void closeTableView() throws IOException {

}

@Override
public void start() throws LoadDataStoreException {

}

@Override
public void init() throws IOException {

}

@Override
public void startTableView() throws LoadDataStoreException {

}

@Override
public void startProducer() throws LoadDataStoreException {

}
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,25 @@ public void closeTableView() throws IOException {

}

@Override
public void start() throws LoadDataStoreException {

}

@Override
public void init() throws IOException {

}

@Override
public void startTableView() throws LoadDataStoreException {

}

@Override
public void startProducer() throws LoadDataStoreException {

}
};

var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
Expand Down Expand Up @@ -436,10 +451,25 @@ public void closeTableView() throws IOException {

}

@Override
public void start() throws LoadDataStoreException {

}

@Override
public void init() throws IOException {

}

@Override
public void startTableView() throws LoadDataStoreException {

}

@Override
public void startProducer() throws LoadDataStoreException {

}
};

BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testPushGetAndRemove() throws Exception {
@Cleanup
LoadDataStore<MyClass> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic, MyClass.class);
loadDataStore.startProducer();
loadDataStore.startTableView();
MyClass myClass1 = new MyClass("1", 1);
loadDataStore.pushAsync("key1", myClass1).get();
Expand Down Expand Up @@ -108,6 +109,7 @@ public void testForEach() throws Exception {
@Cleanup
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
loadDataStore.startProducer();
loadDataStore.startTableView();

Map<String, Integer> map = new HashMap<>();
Expand All @@ -132,6 +134,7 @@ public void testTableViewRestart() throws Exception {
String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
loadDataStore.startProducer();

loadDataStore.startTableView();
loadDataStore.pushAsync("1", 1).get();
Expand Down
Loading

0 comments on commit 7632083

Please sign in to comment.