Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core]Support async lookup in hash store #4423

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.options.MemorySize;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.paimon.utils.Preconditions.checkNotNull;

Expand All @@ -37,15 +38,15 @@ public class CacheManager {

private final Cache cache;

private int fileReadCount;
private final AtomicInteger fileReadCount;

public CacheManager(MemorySize maxMemorySize) {
this(Cache.CacheType.GUAVA, maxMemorySize);
}

public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) {
this.cache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
this.fileReadCount = 0;
this.fileReadCount = new AtomicInteger(0);
}

@VisibleForTesting
Expand All @@ -58,7 +59,7 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal
cache.get(
key,
k -> {
this.fileReadCount++;
this.fileReadCount.incrementAndGet();
try {
return new Cache.CacheValue(
MemorySegment.wrap(reader.read(key)), callback);
Expand All @@ -74,6 +75,6 @@ public void invalidPage(CacheKey key) {
}

public int fileReadCount() {
return fileReadCount;
return fileReadCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.concurrent.locks.Lock;

import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
Expand All @@ -48,22 +49,16 @@ public class LookupFile {

private static final Logger LOG = LoggerFactory.getLogger(LookupFile.class);

private final File localFile;
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
private final Runnable callback;
private File localFile;
private DataFileMeta remoteFile;
private LookupStoreReader reader;
private Runnable callback;

private long requestCount;
private long hitCount;
private boolean isClosed = false;

public LookupFile(
File localFile, DataFileMeta remoteFile, LookupStoreReader reader, Runnable callback) {
this.localFile = localFile;
this.remoteFile = remoteFile;
this.reader = reader;
this.callback = callback;
}
private Lock lock;
private boolean isReady;

@Nullable
public byte[] get(byte[] key) throws IOException {
Expand Down Expand Up @@ -112,11 +107,14 @@ public static Cache<String, LookupFile> createCache(
}

private static int fileWeigh(String file, LookupFile lookupFile) {
if (lookupFile == null || !lookupFile.isReady()) {
return 0;
}
return fileKibiBytes(lookupFile.localFile);
}

private static void removalCallback(String file, LookupFile lookupFile, RemovalCause cause) {
if (lookupFile != null) {
if (lookupFile != null && lookupFile.isReady()) {
try {
lookupFile.close(cause);
} catch (IOException e) {
Expand All @@ -134,4 +132,42 @@ public static String localFilePrefix(
return String.format("%s-%s-%s", partitionString, bucket, remoteFileName);
}
}

public LookupFile withLocalFile(File localFile) {
this.localFile = localFile;
return this;
}

public LookupFile withRemoteFile(DataFileMeta remoteFile) {
this.remoteFile = remoteFile;
return this;
}

public LookupFile withReader(LookupStoreReader reader) {
this.reader = reader;
return this;
}

public LookupFile withCallback(Runnable callback) {
this.callback = callback;
return this;
}

public LookupFile withLock(Lock lock) {
this.lock = lock;
return this;
}

public LookupFile withReady(boolean isReady) {
this.isReady = isReady;
return this;
}

public Lock getLock() {
return lock;
}

public boolean isReady() {
return isReady;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
Expand All @@ -54,7 +58,7 @@

/** Provide lookup by key. */
public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {

private final Map<Object, Lock> lockMap = new ConcurrentHashMap<>();
private final Levels levels;
private final Comparator<InternalRow> keyComparator;
private final RowCompactedSerializer keySerializer;
Expand Down Expand Up @@ -126,32 +130,44 @@ private T lookup(InternalRow key, SortedRun level) throws IOException {

@Nullable
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());

LookupFile lookupFile = lookupFileCache.get(file.fileName(), k -> createLookupFile(file));
boolean newCreatedLookupFile = false;
if (lookupFile == null) {
lookupFile = createLookupFile(file);
newCreatedLookupFile = true;
}

byte[] valueBytes;
Lock lock = lookupFile.getLock();
lock.lock();
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
valueBytes = lookupFile.get(keyBytes);
} finally {
if (newCreatedLookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
if (!lookupFile.isReady()) {
loadDataForLookupFile(file, lookupFile);
newCreatedLookupFile = true;
}

byte[] valueBytes;
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
valueBytes = lookupFile.get(keyBytes);
} finally {
if (newCreatedLookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
}
}
if (valueBytes == null) {
return null;
}

return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
} finally {
lock.unlock();
}
if (valueBytes == null) {
return null;
}
}

return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
private LookupFile createLookupFile(DataFileMeta file) {
Lock lock = lockMap.computeIfAbsent(file, k -> new ReentrantLock(true));
LookupFile lookupFile = new LookupFile();
return lookupFile.withLock(lock);
}

private LookupFile createLookupFile(DataFileMeta file) throws IOException {
private void loadDataForLookupFile(DataFileMeta file, LookupFile lookupFile)
throws IOException {
File localFile = localFileFactory.apply(file.fileName());
if (!localFile.createNewFile()) {
throw new IOException("Can not create new file: " + localFile);
Expand Down Expand Up @@ -191,11 +207,12 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException {
}

ownCachedFiles.add(file.fileName());
return new LookupFile(
localFile,
file,
lookupStoreFactory.createReader(localFile, context),
() -> ownCachedFiles.remove(file.fileName()));
lookupFile
.withLocalFile(localFile)
.withRemoteFile(file)
.withReader(lookupStoreFactory.createReader(localFile, context))
.withCallback(() -> ownCachedFiles.remove(file.fileName()))
.withReady(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class LocalTableQuery implements TableQuery {

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
this.tableView = new ConcurrentHashMap<>();
FileStore<?> tableStore = table.store();
if (!(tableStore instanceof KeyValueFileStore)) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -128,7 +128,7 @@ public void refreshFiles(
List<DataFileMeta> beforeFiles,
List<DataFileMeta> dataFiles) {
LookupLevels<KeyValue> lookupLevels =
tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket);
tableView.computeIfAbsent(partition, k -> new ConcurrentHashMap<>()).get(bucket);
if (lookupLevels == null) {
Preconditions.checkArgument(
beforeFiles.isEmpty(),
Expand Down Expand Up @@ -182,14 +182,14 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
bfGenerator(options),
lookupFileCache);

tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);
tableView
.computeIfAbsent(partition, k -> new ConcurrentHashMap<>())
.put(bucket, lookupLevels);
}

/** TODO remove synchronized and supports multiple thread to lookup. */
@Nullable
@Override
public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)
throws IOException {
public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
Map<Integer, LookupLevels<KeyValue>> buckets = tableView.get(partition);
if (buckets == null || buckets.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ private Collection<RowData> lookup(RowData keyRow) {
Thread.currentThread()
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
try {
synchronized (function) {
return function.lookup(keyRow);
}
return function.lookup(keyRow);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public Collection<RowData> lookup(RowData keyRow) {
}

@Nullable
private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception {
private synchronized BinaryRow refreshDynamicPartition(boolean reopen) throws Exception {
if (partitionLoader == null) {
return null;
}
Expand Down Expand Up @@ -284,7 +284,7 @@ private Predicate createSpecificPartFilter(BinaryRow partition) {
return createPartitionPredicate(rowType, partitionMap);
}

private void reopen() {
private synchronized void reopen() {
try {
close();
open();
Expand All @@ -293,7 +293,7 @@ private void reopen() {
}
}

private void checkRefresh() throws Exception {
private synchronized void checkRefresh() throws Exception {
if (nextLoadTime > System.currentTimeMillis()) {
return;
}
Expand All @@ -319,7 +319,7 @@ private void refresh() throws Exception {
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
if (lookupTable != null) {
lookupTable.close();
lookupTable = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ private void doRefresh() throws Exception {
}
}

/** TODO remove synchronized and supports multiple thread to lookup. */
@Override
public final List<InternalRow> get(InternalRow key) throws IOException {
public final synchronized List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> values;
if (refreshAsync) {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Triple;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -108,25 +109,31 @@ public void open() throws Exception {

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
Triple<BinaryRow, Integer, InternalRow> partitionAndBucket = extractPartitionAndBucket(key);
InternalRow kv =
queryExecutor.lookup(
partitionAndBucket.f0, partitionAndBucket.f1, partitionAndBucket.f2);
if (kv == null) {
return Collections.emptyList();
} else {
return Collections.singletonList(kv);
}
}

private synchronized Triple<BinaryRow, Integer, InternalRow> extractPartitionAndBucket(
InternalRow key) {
InternalRow adjustedKey = key;
if (keyRearrange != null) {
adjustedKey = keyRearrange.replaceRow(adjustedKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have encapsulated these as a separate function called extractPartartitionAndBucket. And it has been declared as synchronized, so there will be no thread safety issues.

}
extractor.setRecord(adjustedKey);
int bucket = extractor.bucket();
BinaryRow partition = extractor.partition();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above


InternalRow trimmedKey = key;
if (trimmedKeyRearrange != null) {
trimmedKey = trimmedKeyRearrange.replaceRow(trimmedKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

}

InternalRow kv = queryExecutor.lookup(partition, bucket, trimmedKey);
if (kv == null) {
return Collections.emptyList();
} else {
return Collections.singletonList(kv);
}
return Triple.of(partition, bucket, trimmedKey);
}

@Override
Expand Down
Loading
Loading