Skip to content

Commit

Permalink
Merge branch 'master' into atomic_move
Browse files Browse the repository at this point in the history
  • Loading branch information
lifepuzzlefun1 committed Jan 8, 2024
2 parents 4c5db66 + 03d0a36 commit 1c5174b
Show file tree
Hide file tree
Showing 47 changed files with 425 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
Expand All @@ -58,6 +59,7 @@
public class GarbageCollectorThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
private static final int SECOND = 1000;
private static final long MINUTE = TimeUnit.MINUTES.toMillis(1);

// Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
private EntryLogMetadataMap entryLogMetaMap;
Expand Down Expand Up @@ -591,6 +593,13 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
entryLogUsageBuckets);

final int maxBucket = calculateUsageIndex(numBuckets, threshold);
int totalEntryLogIds = 0;
for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
totalEntryLogIds += compactableBuckets.get(currBucket).size();
}
long lastPrintTimestamp = 0;
AtomicInteger processedEntryLogCnt = new AtomicInteger(0);

stopCompaction:
for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket);
Expand All @@ -608,7 +617,11 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet

final int bucketIndex = currBucket;
final long logId = entryLogIds.remove();

if (System.currentTimeMillis() - lastPrintTimestamp >= MINUTE) {
lastPrintTimestamp = System.currentTimeMillis();
LOG.info("Compaction progress {} / {}, current compaction entryLogId: {}",
processedEntryLogCnt.get(), totalEntryLogIds, logId);
}
entryLogMetaMap.forKey(logId, (entryLogId, meta) -> {
if (meta == null) {
if (LOG.isDebugEnabled()) {
Expand All @@ -625,6 +638,7 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
compactEntryLog(meta);
gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
compactedBuckets[bucketIndex]++;
processedEntryLogCnt.getAndIncrement();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1631,14 +1631,17 @@ public void decommissionBookie(BookieId bookieAddress)

private void waitForLedgersToBeReplicated(Collection<Long> ledgers, BookieId thisBookieAddress,
LedgerManager ledgerManager) throws InterruptedException, TimeoutException {
int maxSleepTimeInBetweenChecks = 10 * 60 * 1000; // 10 minutes
int sleepTimePerLedger = 10 * 1000; // 10 secs
int maxSleepTimeInBetweenChecks = 5 * 60 * 1000; // 5 minutes
int sleepTimePerLedger = 3 * 1000; // 3 secs
Predicate<Long> validateBookieIsNotPartOfEnsemble = ledgerId -> !areEntriesOfLedgerStoredInTheBookie(ledgerId,
thisBookieAddress, ledgerManager);
ledgers.removeIf(validateBookieIsNotPartOfEnsemble);

while (!ledgers.isEmpty()) {
LOG.info("Count of Ledgers which need to be rereplicated: {}", ledgers.size());
int sleepTimeForThisCheck = (long) ledgers.size() * sleepTimePerLedger > maxSleepTimeInBetweenChecks
? maxSleepTimeInBetweenChecks : ledgers.size() * sleepTimePerLedger;
LOG.info("Count of Ledgers which need to be rereplicated: {}, waiting {} seconds for next check",
ledgers.size(), sleepTimeForThisCheck / 1000);
Thread.sleep(sleepTimeForThisCheck);
if (LOG.isDebugEnabled()) {
LOG.debug("Making sure following ledgers replication to be completed: {}", ledgers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ static Set<LedgerFragment> splitIntoSubFragments(LedgerHandle lh,
assert false;
}

long numberOfEntriesToReplicate = (lastEntryId - firstEntryId) + 1;
long numberOfEntriesToReplicate = firstEntryId == INVALID_ENTRY_ID ? 0 : (lastEntryId - firstEntryId) + 1;
long splitsWithFullEntries = numberOfEntriesToReplicate
/ rereplicationEntryBatchSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,40 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso
boolean enforceMinNumRacksPerWriteQuorum,
boolean ignoreLocalNodeInPlacementPolicy,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, false,
statsLogger, bookieAddressResolver);
}

@Override
protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsResolver,
HashedWheelTimer timer,
boolean reorderReadsRandom,
int stabilizePeriodSeconds,
int reorderThresholdPendingRequests,
boolean isWeighted,
int maxWeightMultiple,
int minNumRacksPerWriteQuorum,
boolean enforceMinNumRacksPerWriteQuorum,
boolean ignoreLocalNodeInPlacementPolicy,
boolean useHostnameResolveLocalNodePlacementPolicy,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
if (stabilizePeriodSeconds > 0) {
super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum,
ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver);
ignoreLocalNodeInPlacementPolicy, useHostnameResolveLocalNodePlacementPolicy,
statsLogger, bookieAddressResolver);
slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, statsLogger,
bookieAddressResolver);
enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy,
useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver);
} else {
super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, statsLogger,
bookieAddressResolver);
enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy,
useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver);
slave = null;
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
protected int minNumRacksPerWriteQuorum;
protected boolean enforceMinNumRacksPerWriteQuorum;
protected boolean ignoreLocalNodeInPlacementPolicy;
protected boolean useHostnameResolveLocalNodePlacementPolicy;

public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";

Expand Down Expand Up @@ -144,6 +145,41 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
topology = new NetworkTopologyImpl();
}

/**
* Initialize the policy.
*
* @param dnsResolver
* @param timer
* @param reorderReadsRandom
* @param stabilizePeriodSeconds
* @param reorderThresholdPendingRequests
* @param isWeighted
* @param maxWeightMultiple
* @param minNumRacksPerWriteQuorum
* @param enforceMinNumRacksPerWriteQuorum
* @param ignoreLocalNodeInPlacementPolicy
* @param statsLogger
* @param bookieAddressResolver
* @return initialized ensemble placement policy
*/
protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver,
HashedWheelTimer timer,
boolean reorderReadsRandom,
int stabilizePeriodSeconds,
int reorderThresholdPendingRequests,
boolean isWeighted,
int maxWeightMultiple,
int minNumRacksPerWriteQuorum,
boolean enforceMinNumRacksPerWriteQuorum,
boolean ignoreLocalNodeInPlacementPolicy,
StatsLogger statsLogger,
BookieAddressResolver bookieAddressResolver) {
return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy,
false, statsLogger, bookieAddressResolver);
}

/**
* Initialize the policy.
*
Expand All @@ -160,6 +196,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
int minNumRacksPerWriteQuorum,
boolean enforceMinNumRacksPerWriteQuorum,
boolean ignoreLocalNodeInPlacementPolicy,
boolean useHostnameResolveLocalNodePlacementPolicy,
StatsLogger statsLogger,
BookieAddressResolver bookieAddressResolver) {
checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead.");
Expand Down Expand Up @@ -195,6 +232,7 @@ public Integer getSample() {
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum;
this.ignoreLocalNodeInPlacementPolicy = ignoreLocalNodeInPlacementPolicy;
this.useHostnameResolveLocalNodePlacementPolicy = useHostnameResolveLocalNodePlacementPolicy;

// create the network topology
if (stabilizePeriodSeconds > 0) {
Expand All @@ -206,7 +244,9 @@ public Integer getSample() {
BookieNode bn = null;
if (!ignoreLocalNodeInPlacementPolicy) {
try {
bn = createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress());
String hostname = useHostnameResolveLocalNodePlacementPolicy
? InetAddress.getLocalHost().getCanonicalHostName() : InetAddress.getLocalHost().getHostAddress();
bn = createDummyLocalBookieNode(hostname);
} catch (IOException e) {
LOG.error("Failed to get local host address : ", e);
}
Expand Down Expand Up @@ -303,6 +343,7 @@ public Long load(BookieId key) throws Exception {
conf.getMinNumRacksPerWriteQuorum(),
conf.getEnforceMinNumRacksPerWriteQuorum(),
conf.getIgnoreLocalNodeInPlacementPolicy(),
conf.getUseHostnameResolveLocalNodePlacementPolicy(),
statsLogger,
bookieAddressResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
.initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
this.ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver)
this.ignoreLocalNodeInPlacementPolicy,
this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}

Expand Down Expand Up @@ -201,7 +202,8 @@ public void onBookieRackChange(List<BookieId> bookieAddressList) {
this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests,
this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
this.ignoreLocalNodeInPlacementPolicy, statsLogger,
this.ignoreLocalNodeInPlacementPolicy,
this.useHostnameResolveLocalNodePlacementPolicy, statsLogger,
bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
perRegionPlacement.put(newRegion, newRegionPlacement);
Expand Down Expand Up @@ -242,7 +244,8 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
.initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
this.ignoreLocalNodeInPlacementPolicy, statsLogger, bookieAddressResolver)
this.ignoreLocalNodeInPlacementPolicy, this.ignoreLocalNodeInPlacementPolicy,
statsLogger, bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES =
"ensemblePlacementPolicyOrderSlowBookies";
protected static final String BOOKIE_ADDRESS_RESOLVER_ENABLED = "bookieAddressResolverEnabled";
// Use hostname to resolve local placement info
public static final String USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY =
"useHostnameResolveLocalNodePlacementPolicy";

// Stats
protected static final String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
Expand Down Expand Up @@ -1314,6 +1317,22 @@ public ClientConfiguration setBookieAddressResolverEnabled(boolean enabled) {
return this;
}

/**
* Set the flag to use hostname to resolve local node placement policy.
* @param useHostnameResolveLocalNodePlacementPolicy
*/
public void setUseHostnameResolveLocalNodePlacementPolicy(boolean useHostnameResolveLocalNodePlacementPolicy) {
setProperty(USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY, useHostnameResolveLocalNodePlacementPolicy);
}

/**
* Get whether to use hostname to resolve local node placement policy.
* @return
*/
public boolean getUseHostnameResolveLocalNodePlacementPolicy() {
return getBoolean(USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY, false);
}

/**
* Whether to enable recording task execution stats.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@
* <li>Open hash map with linear probing, no node allocations to store the values
* </ol>
*
* <b>WARN: method forEach do not guarantee thread safety, nor do the keys and values method.</b>
* <br>
* The forEach method is specifically designed for single-threaded usage. When iterating over a map
* with concurrent writes, it becomes possible for new values to be either observed or not observed.
* There is no guarantee that if we write value1 and value2, and are able to see value2, then we will also see value1.
* In some cases, it is even possible to encounter two mappings with the same key,
* leading the keys method to return a List containing two identical keys.
*
* <br>
* It is crucial to understand that the results obtained from aggregate status methods such as keys and values
* are typically reliable only when the map is not undergoing concurrent updates from other threads.
* When concurrent updates are involved, the results of these methods reflect transient states
* that may be suitable for monitoring or estimation purposes, but not for program control.
* @param <V>
*/
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -260,6 +273,12 @@ public void clear() {
}
}

/**
* Iterate over all the entries in the map and apply the processor function to each of them.
* <p>
* <b>Warning: Do Not Guarantee Thread-Safety.</b>
* @param processor the processor to apply to each entry
*/
public void forEach(EntryProcessor<V> processor) {
for (Section<V> s : sections) {
s.forEach(processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@
* no node allocations are required to store the values.
*
* <p>Items <strong>MUST</strong> be &gt;= 0.
*
* <br>
* <b>WARN: method forEach do not guarantee thread safety, nor does the items method.</b>
* <br>
* The forEach method is specifically designed for single-threaded usage. When iterating over a set
* with concurrent writes, it becomes possible for new values to be either observed or not observed.
* There is no guarantee that if we write value1 and value2, and are able to see value2, then we will also see value1.
*
* <br>
* It is crucial to understand that the results obtained from aggregate status methods such as items
* are typically reliable only when the map is not undergoing concurrent updates from other threads.
* When concurrent updates are involved, the results of these methods reflect transient states
* that may be suitable for monitoring or estimation purposes, but not for program control.
*/
public class ConcurrentLongHashSet {

Expand Down
Loading

0 comments on commit 1c5174b

Please sign in to comment.