Skip to content

Commit

Permalink
[improve][broker] Register the broker to metadata store without versi…
Browse files Browse the repository at this point in the history
…on id compare (apache#23298)
  • Loading branch information
BewareMyPower authored Sep 13, 2024
1 parent fc60ec0 commit 13c19b5
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -39,11 +39,11 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.CreateOption;

/**
* The broker registry impl, base on the LockManager.
Expand All @@ -57,16 +57,14 @@ public class BrokerRegistryImpl implements BrokerRegistry {

private final BrokerLookupData brokerLookupData;

private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
private final MetadataCache<BrokerLookupData> brokerLookupDataMetadataCache;

private final String brokerId;
private final String brokerIdKeyPath;

private final ScheduledExecutorService scheduler;

private final List<BiConsumer<String, NotificationType>> listeners;

private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;

protected enum State {
Init,
Started,
Expand All @@ -79,10 +77,10 @@ protected enum State {
public BrokerRegistryImpl(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
this.brokerId = pulsar.getBrokerId();
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
Expand Down Expand Up @@ -122,7 +120,7 @@ public boolean isStarted() {
public synchronized void register() throws MetadataStoreException {
if (this.state == State.Started) {
try {
this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData)
brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.state = State.Registered;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
Expand All @@ -135,30 +133,37 @@ public synchronized void register() throws MetadataStoreException {
public synchronized void unregister() throws MetadataStoreException {
if (this.state == State.Registered) {
try {
this.brokerLookupDataLock.release()
brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.state = State.Started;
} catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) {
} catch (ExecutionException e) {
if (e.getCause() instanceof MetadataStoreException.NotFoundException) {
log.warn("{} has already been unregistered", brokerIdKeyPath);
} else {
throw MetadataStoreException.unwrap(e);
}
} catch (InterruptedException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
} finally {
this.state = State.Started;
}
}
}

@Override
public String getBrokerId() {
return this.brokerId;
return pulsar.getBrokerId();
}

@Override
public CompletableFuture<List<String>> getAvailableBrokersAsync() {
this.checkState();
return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenApply(ArrayList::new);
return brokerLookupDataMetadataCache.getChildren(LOADBALANCE_BROKERS_ROOT);
}

@Override
public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) {
this.checkState();
return brokerLookupDataLockManager.readLock(keyPath(broker));
return brokerLookupDataMetadataCache.get(keyPath(broker));
}

public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
Expand Down Expand Up @@ -192,13 +197,8 @@ public synchronized void close() throws PulsarServerException {
try {
this.listeners.clear();
this.unregister();
this.brokerLookupDataLockManager.close();
} catch (Exception ex) {
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
} else {
throw new PulsarServerException(MetadataStoreException.unwrap(ex));
}
log.error("Unexpected error when unregistering the broker registry", ex);
} finally {
this.state = State.Closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testRegisterAndLookup() throws Exception {
}

@Test
public void testRegisterFailWithSameBrokerId() throws Exception {
public void testRegisterWithSameBrokerId() throws Exception {
PulsarService pulsar1 = createPulsarService();
PulsarService pulsar2 = createPulsarService();
pulsar1.start();
Expand All @@ -301,14 +301,10 @@ public void testRegisterFailWithSameBrokerId() throws Exception {
BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
brokerRegistry1.start();
try {
brokerRegistry2.start();
fail();
} catch (Exception ex) {
log.info("Broker registry start failed.", ex);
assertTrue(ex instanceof PulsarServerException);
assertTrue(ex.getMessage().contains("LockBusyException"));
}
brokerRegistry2.start();

pulsar1.close();
pulsar2.close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.pulsar.metadata.api;

import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.extended.CreateOption;

/**
* Represent the caching layer access for a specific type of objects.
Expand Down Expand Up @@ -128,6 +130,24 @@ public interface MetadataCache<T> {
*/
CompletableFuture<Void> create(String path, T value);

/**
* Create or update the value of the given path in the metadata store without version comparison.
* <p>
* This method is equivalent to
* {@link org.apache.pulsar.metadata.api.extended.MetadataStoreExtended#put(String, byte[], Optional, EnumSet)} or
* {@link MetadataStore#put(String, byte[], Optional)} if the metadata store does not support this extended API,
* with `Optional.empty()` as the 3rd argument. It means if the path does not exist, it will be created. If the path
* already exists, the new value will override the old value.
* </p>
* @param path the path of the object in the metadata store
* @param value the object to put in the metadata store
* @param options the create options if the path does not in the metadata store
* @return the future that indicates if this operation failed, it could fail with
* {@link java.io.IOException} if the value failed to be serialized
* {@link MetadataStoreException} if the metadata store operation failed
*/
CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption> options);

/**
* Delete an object from the metadata store.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -47,12 +48,15 @@
import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;

@Slf4j
public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notification> {
@Getter
private final MetadataStore store;
private final MetadataStoreExtended storeExtended;
private final MetadataSerde<T> serde;

private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache;
Expand All @@ -67,6 +71,11 @@ public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig

public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
this.store = store;
if (store instanceof MetadataStoreExtended) {
this.storeExtended = (MetadataStoreExtended) store;
} else {
this.storeExtended = null;
}
this.serde = serde;

Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
Expand Down Expand Up @@ -243,6 +252,21 @@ public CompletableFuture<Void> create(String path, T value) {
return future;
}

@Override
public CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption> options) {
final byte[] bytes;
try {
bytes = serde.serialize(path, value);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
if (storeExtended != null) {
return storeExtended.put(path, bytes, Optional.empty(), options).thenAccept(__ -> refresh(path));
} else {
return store.put(path, bytes, Optional.empty()).thenAccept(__ -> refresh(path));
}
}

@Override
public CompletableFuture<Void> delete(String path) {
return store.delete(path, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -55,6 +56,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -597,4 +599,27 @@ public CustomClass deserialize(String path, byte[] content, Stat stat) throws IO
assertEquals(res.getValue().b, 2);
assertEquals(res.getValue().path, key1);
}

@Test(dataProvider = "distributedImpl")
public void testPut(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup final var store1 = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
.build());
final var cache1 = store1.getMetadataCache(Integer.class);
@Cleanup final var store2 = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
.build());
final var cache2 = store2.getMetadataCache(Integer.class);
final var key = "/testPut";

cache1.put(key, 1, EnumSet.of(CreateOption.Ephemeral)); // create
Awaitility.await().untilAsserted(() -> {
assertEquals(cache1.get(key).get().orElse(-1), 1);
assertEquals(cache2.get(key).get().orElse(-1), 1);
});

cache2.put(key, 2, EnumSet.of(CreateOption.Ephemeral)); // update
Awaitility.await().untilAsserted(() -> {
assertEquals(cache1.get(key).get().orElse(-1), 2);
assertEquals(cache2.get(key).get().orElse(-1), 2);
});
}
}

0 comments on commit 13c19b5

Please sign in to comment.