From 13c19b50216ba7e73766e6fa7b57d2700614e3b5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 13 Sep 2024 19:07:12 +0800 Subject: [PATCH] [improve][broker] Register the broker to metadata store without version id compare (#23298) --- .../extensions/BrokerRegistryImpl.java | 44 +++++++++---------- .../extensions/BrokerRegistryTest.java | 14 +++--- .../pulsar/metadata/api/MetadataCache.java | 20 +++++++++ .../cache/impl/MetadataCacheImpl.java | 24 ++++++++++ .../pulsar/metadata/MetadataCacheTest.java | 25 +++++++++++ 5 files changed, 96 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 5db11d40c33ff..f34d377990b68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -21,11 +21,11 @@ import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; @@ -39,11 +39,11 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; -import org.apache.pulsar.metadata.api.coordination.LockManager; -import org.apache.pulsar.metadata.api.coordination.ResourceLock; +import org.apache.pulsar.metadata.api.extended.CreateOption; /** * The broker registry impl, base on the LockManager. @@ -57,16 +57,14 @@ public class BrokerRegistryImpl implements BrokerRegistry { private final BrokerLookupData brokerLookupData; - private final LockManager brokerLookupDataLockManager; + private final MetadataCache brokerLookupDataMetadataCache; - private final String brokerId; + private final String brokerIdKeyPath; private final ScheduledExecutorService scheduler; private final List> listeners; - private volatile ResourceLock brokerLookupDataLock; - protected enum State { Init, Started, @@ -79,10 +77,10 @@ protected enum State { public BrokerRegistryImpl(PulsarService pulsar) { this.pulsar = pulsar; this.conf = pulsar.getConfiguration(); - this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class); + this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class); this.scheduler = pulsar.getLoadManagerExecutor(); this.listeners = new ArrayList<>(); - this.brokerId = pulsar.getBrokerId(); + this.brokerIdKeyPath = keyPath(pulsar.getBrokerId()); this.brokerLookupData = new BrokerLookupData( pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), @@ -122,7 +120,7 @@ public boolean isStarted() { public synchronized void register() throws MetadataStoreException { if (this.state == State.Started) { try { - this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData) + brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral)) .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); this.state = State.Registered; } catch (InterruptedException | ExecutionException | TimeoutException e) { @@ -135,30 +133,37 @@ public synchronized void register() throws MetadataStoreException { public synchronized void unregister() throws MetadataStoreException { if (this.state == State.Registered) { try { - this.brokerLookupDataLock.release() + brokerLookupDataMetadataCache.delete(brokerIdKeyPath) .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); - this.state = State.Started; - } catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) { + } catch (ExecutionException e) { + if (e.getCause() instanceof MetadataStoreException.NotFoundException) { + log.warn("{} has already been unregistered", brokerIdKeyPath); + } else { + throw MetadataStoreException.unwrap(e); + } + } catch (InterruptedException | TimeoutException e) { throw MetadataStoreException.unwrap(e); + } finally { + this.state = State.Started; } } } @Override public String getBrokerId() { - return this.brokerId; + return pulsar.getBrokerId(); } @Override public CompletableFuture> getAvailableBrokersAsync() { this.checkState(); - return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenApply(ArrayList::new); + return brokerLookupDataMetadataCache.getChildren(LOADBALANCE_BROKERS_ROOT); } @Override public CompletableFuture> lookupAsync(String broker) { this.checkState(); - return brokerLookupDataLockManager.readLock(keyPath(broker)); + return brokerLookupDataMetadataCache.get(keyPath(broker)); } public CompletableFuture> getAvailableBrokerLookupDataAsync() { @@ -192,13 +197,8 @@ public synchronized void close() throws PulsarServerException { try { this.listeners.clear(); this.unregister(); - this.brokerLookupDataLockManager.close(); } catch (Exception ex) { - if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { - throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex)); - } else { - throw new PulsarServerException(MetadataStoreException.unwrap(ex)); - } + log.error("Unexpected error when unregistering the broker registry", ex); } finally { this.state = State.Closed; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 42600a4203551..91ada90dda690 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -291,7 +291,7 @@ public void testRegisterAndLookup() throws Exception { } @Test - public void testRegisterFailWithSameBrokerId() throws Exception { + public void testRegisterWithSameBrokerId() throws Exception { PulsarService pulsar1 = createPulsarService(); PulsarService pulsar2 = createPulsarService(); pulsar1.start(); @@ -301,14 +301,10 @@ public void testRegisterFailWithSameBrokerId() throws Exception { BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1); BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2); brokerRegistry1.start(); - try { - brokerRegistry2.start(); - fail(); - } catch (Exception ex) { - log.info("Broker registry start failed.", ex); - assertTrue(ex instanceof PulsarServerException); - assertTrue(ex.getMessage().contains("LockBusyException")); - } + brokerRegistry2.start(); + + pulsar1.close(); + pulsar2.close(); } @Test diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java index 6d558e709716d..8e153b23d3087 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.metadata.api; +import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.apache.pulsar.metadata.api.extended.CreateOption; /** * Represent the caching layer access for a specific type of objects. @@ -128,6 +130,24 @@ public interface MetadataCache { */ CompletableFuture create(String path, T value); + /** + * Create or update the value of the given path in the metadata store without version comparison. + *

+ * This method is equivalent to + * {@link org.apache.pulsar.metadata.api.extended.MetadataStoreExtended#put(String, byte[], Optional, EnumSet)} or + * {@link MetadataStore#put(String, byte[], Optional)} if the metadata store does not support this extended API, + * with `Optional.empty()` as the 3rd argument. It means if the path does not exist, it will be created. If the path + * already exists, the new value will override the old value. + *

+ * @param path the path of the object in the metadata store + * @param value the object to put in the metadata store + * @param options the create options if the path does not in the metadata store + * @return the future that indicates if this operation failed, it could fail with + * {@link java.io.IOException} if the value failed to be serialized + * {@link MetadataStoreException} if the metadata store operation failed + */ + CompletableFuture put(String path, T value, EnumSet options); + /** * Delete an object from the metadata store. *

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b9051a7dc7df4..ee394b0267c88 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -47,12 +48,15 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; @Slf4j public class MetadataCacheImpl implements MetadataCache, Consumer { @Getter private final MetadataStore store; + private final MetadataStoreExtended storeExtended; private final MetadataSerde serde; private final AsyncLoadingCache>> objCache; @@ -67,6 +71,11 @@ public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig public MetadataCacheImpl(MetadataStore store, MetadataSerde serde, MetadataCacheConfig cacheConfig) { this.store = store; + if (store instanceof MetadataStoreExtended) { + this.storeExtended = (MetadataStoreExtended) store; + } else { + this.storeExtended = null; + } this.serde = serde; Caffeine cacheBuilder = Caffeine.newBuilder(); @@ -243,6 +252,21 @@ public CompletableFuture create(String path, T value) { return future; } + @Override + public CompletableFuture put(String path, T value, EnumSet options) { + final byte[] bytes; + try { + bytes = serde.serialize(path, value); + } catch (IOException e) { + return CompletableFuture.failedFuture(e); + } + if (storeExtended != null) { + return storeExtended.put(path, bytes, Optional.empty(), options).thenAccept(__ -> refresh(path)); + } else { + return store.put(path, bytes, Optional.empty()).thenAccept(__ -> refresh(path)); + } + } + @Override public CompletableFuture delete(String path) { return store.delete(path, Optional.empty()); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index df59d25bdcc0e..bac5807360453 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -55,6 +56,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.awaitility.Awaitility; import org.testng.annotations.DataProvider; @@ -597,4 +599,27 @@ public CustomClass deserialize(String path, byte[] content, Stat stat) throws IO assertEquals(res.getValue().b, 2); assertEquals(res.getValue().path, key1); } + + @Test(dataProvider = "distributedImpl") + public void testPut(String provider, Supplier urlSupplier) throws Exception { + @Cleanup final var store1 = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder() + .build()); + final var cache1 = store1.getMetadataCache(Integer.class); + @Cleanup final var store2 = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder() + .build()); + final var cache2 = store2.getMetadataCache(Integer.class); + final var key = "/testPut"; + + cache1.put(key, 1, EnumSet.of(CreateOption.Ephemeral)); // create + Awaitility.await().untilAsserted(() -> { + assertEquals(cache1.get(key).get().orElse(-1), 1); + assertEquals(cache2.get(key).get().orElse(-1), 1); + }); + + cache2.put(key, 2, EnumSet.of(CreateOption.Ephemeral)); // update + Awaitility.await().untilAsserted(() -> { + assertEquals(cache1.get(key).get().orElse(-1), 2); + assertEquals(cache2.get(key).get().orElse(-1), 2); + }); + } }