Skip to content

Commit

Permalink
Improve initialization by processing documents in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbey committed Apr 23, 2024
1 parent d8987a6 commit 424a183
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.eclipse.collections.api.factory.Lists;
Expand Down Expand Up @@ -326,30 +327,42 @@ public CompileResult getCompileResult(SectionState sectionState)
{
DocumentState documentState = sectionState.getDocumentState();
GlobalState globalState = documentState.getGlobalState();
return globalState.getProperty(COMPILE_RESULT, () -> tryCompile(globalState, documentState, sectionState));
// when looking for compile results, there might be another thread working on it already
// if current thread is the one that sets the completable future, then do the actual compilation and set it on the completable future
// if current thread does not set the completable future, then just join and wait for result from another thread
CompletableFuture<CompileResult> maybeCompileResultFuture = new CompletableFuture<>();
CompletableFuture<CompileResult> compileResultFuture = globalState.getProperty(COMPILE_RESULT, () -> maybeCompileResultFuture);

if (compileResultFuture == maybeCompileResultFuture)
{
compileResultFuture.complete(this.tryCompile(globalState, documentState, sectionState));
}

return compileResultFuture.join();
}

protected CompileResult tryCompile(GlobalState globalState, DocumentState documentState, SectionState sectionState)
{
long started = System.currentTimeMillis();
globalState.logInfo("Starting compilation");
PureModelContextData pureModelContextData = null;
try
{
pureModelContextData = buildPureModelContextData(globalState);
PureModel pureModel = Compiler.compile(pureModelContextData, DeploymentMode.PROD, "");
globalState.logInfo("Compilation completed successfully");
globalState.logInfo("Compilation completed successfully in " + (System.currentTimeMillis() - started) + "ms");
return new CompileResult(pureModel, pureModelContextData);
}
catch (EngineException e)
{
SourceInformation sourceInfo = e.getSourceInformation();
if (isValidSourceInfo(sourceInfo))
{
globalState.logInfo("Compilation completed with error " + "(" + sourceInfo.sourceId + " " + SourceInformationUtil.toLocation(sourceInfo) + "): " + e.getMessage());
globalState.logInfo("Compilation completed in " + (System.currentTimeMillis() - started) + "ms with error " + "(" + sourceInfo.sourceId + " " + SourceInformationUtil.toLocation(sourceInfo) + "): " + e.getMessage());
}
else
{
globalState.logInfo("Compilation completed with error: " + e.getMessage());
globalState.logInfo("Compilation completed in " + (System.currentTimeMillis() - started) + "ms with error: " + e.getMessage());
globalState.logWarning("Invalid source information for compilation error");
LOGGER.warn("Invalid source information in exception during compilation requested for section {} of {}: {}", sectionState.getSectionNumber(), documentState.getDocumentId(), (sourceInfo == null) ? null : sourceInfo.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -300,6 +303,28 @@ public void forEachDocumentState(Consumer<? super DocumentState> consumer)
this.docStates.forEachValue(consumer::accept);
}

@Override
public CompletableFuture<Void> forEachDocumentStateParallel(Consumer<? super DocumentState> consumer)
{
this.docStates.forEachValue(consumer::accept);
return CompletableFuture.completedFuture(null);
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentState(Function<? super DocumentState, List<RESULT>> func)
{
List<RESULT> results = this.docStates.stream().map(func).flatMap(List::stream).collect(Collectors.toList());
return CompletableFuture.completedFuture(results);
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func)
{
List<RESULT> results = new ArrayList<>();
this.docStates.stream().forEach(x -> x.forEachSectionState(s -> results.addAll(func.apply(x, s))));
return CompletableFuture.completedFuture(results);
}

@Override
public Collection<LegendLSPGrammarExtension> getAvailableGrammarExtensions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package org.finos.legend.engine.ide.lsp.extension.state;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPFeature;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPGrammarExtension;
Expand All @@ -41,6 +45,26 @@ public interface GlobalState extends State
*/
void forEachDocumentState(Consumer<? super DocumentState> consumer);

/**
* Apply the given consumer to each document state. No particular order is guaranteed.
* Implementation could do this in parallel
*
* @param consumer document state consumer, needs to be threadsafe
* @return future to track when this completes
*/
CompletableFuture<Void> forEachDocumentStateParallel(Consumer<? super DocumentState> consumer);

/**
* Apply the given function to each document state, collecting its result in a list. No particular order is guaranteed.
* Implementation could do this in parallel.
*
* @param func function to apply to each document state, needs to be threadsafe
* @return future to with the collected results
*/
<RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentState(Function<? super DocumentState, List<RESULT>> func);

<RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func);

/**
* List of available grammar extensions. This is useful for extensions that need to dispatch to other extensions
* for further processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,14 @@ public String getProjectVersion()
@Override
public void initialized(InitializedParams params)
{
long start = System.currentTimeMillis();
checkReady();
this.classpathFactory.initialize(this);
CompletableFuture<Void> initializeExtensions = this.initializeExtensions();
CompletableFuture<Void> engineServerUrl = this.initializeEngineServerUrl();

CompletableFuture.allOf(initializeExtensions, engineServerUrl)
.thenRun(() -> this.logInfoToClient("Extension finished post-initialization"));
.thenRun(() -> this.logInfoToClient("Extension finished post-initialization in " + (System.currentTimeMillis() - start) + "ms"));

}

Expand Down Expand Up @@ -292,10 +293,16 @@ private CompletableFuture<Void> initializeExtensions()

return this.classpathFactory.create(Collections.unmodifiableSet(this.rootFolders))
.thenAccept(this.extensionGuard::initialize)
.thenRun(this.extensionGuard.wrapOnClasspath(this::reprocessDocuments))
.thenCompose(_x -> this.reprocessDocuments())
.thenRun(this.legendLanguageService::loadVirtualFileSystemContent)
// trigger compilation
.thenRun(this.extensionGuard.wrapOnClasspath(() -> this.globalState.forEachDocumentState(this.textDocumentService::getLegendDiagnostics)))
.thenCompose(_x -> this.globalState.forEachDocumentStateParallel(x ->
{
long diagnosticStarted = System.currentTimeMillis();
this.textDocumentService.getLegendDiagnostics(x);
LOGGER.info("Diagnostics computed for {} took {}ms", x.getDocumentId(), System.currentTimeMillis() - diagnosticStarted);

}))
.thenRun(() ->
{
LanguageClient languageClient = this.getLanguageClient();
Expand All @@ -314,10 +321,14 @@ private CompletableFuture<Void> initializeExtensions()
});
}

private void reprocessDocuments()
private CompletableFuture<Void> reprocessDocuments()
{
this.globalState.forEachDocumentState(x -> ((LegendServerGlobalState.LegendServerDocumentState) x).recreateSectionStates());
this.globalState.clearProperties();
return this.globalState.forEachDocumentStateParallel(x ->
{
long startTime = System.currentTimeMillis();
((LegendServerGlobalState.LegendServerDocumentState) x).recreateSectionStates();
LOGGER.info("Reprocessing {} took {}ms", x.getDocumentId(), System.currentTimeMillis() - startTime);
}).thenRun(this.globalState::clearProperties);
}

@Override
Expand Down Expand Up @@ -484,10 +495,10 @@ public <T> CompletableFuture<T> supplyPossiblyAsync(Supplier<T> supplier)
return this.supplyPossiblyAsync_internal(this.extensionGuard.wrapOnClasspath(supplier));
}

void runPossiblyAsync(Runnable runnable)
CompletableFuture<?> runPossiblyAsync(Runnable runnable)
{
checkReady();
this.runPossiblyAsync_internal(this.extensionGuard.wrapOnClasspath(runnable));
return this.runPossiblyAsync_internal(this.extensionGuard.wrapOnClasspath(runnable));
}

private CompletableFuture<?> runPossiblyAsync_internal(Runnable work)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,17 @@ public CompletableFuture<String> replClasspath()
@Override
public CompletableFuture<List<LegendTest>> testCases()
{
return this.server.supplyPossiblyAsync(() ->
{
List<LegendTest> commands = new ArrayList<>();

this.server.getGlobalState().forEachDocumentState(docState ->
{
docState.forEachSectionState(sectionState ->
return this.server.getGlobalState().collectFromEachDocumentSectionState((docState, sectionState) ->
{
List<LegendTest> commands = new ArrayList<>();
LegendLSPGrammarExtension extension = sectionState.getExtension();
if (extension != null)
{
commands.addAll(extension.testCases(sectionState));
}
});
});

return commands;
});
return commands;
}
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPFeature;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPGrammarExtension;
Expand Down Expand Up @@ -64,6 +68,37 @@ public void forEachDocumentState(Consumer<? super DocumentState> consumer)
this.docs.values().forEach(consumer);
}

@Override
public CompletableFuture<Void> forEachDocumentStateParallel(Consumer<? super DocumentState> consumer)
{
return CompletableFuture.allOf(
this.docs.values()
.stream()
.map(x -> this.server.runPossiblyAsync(() -> consumer.accept(x)))
.toArray(CompletableFuture[]::new)
);
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentState(Function<? super DocumentState, List<RESULT>> func)
{
return this.docs.values()
.stream()
.map(x -> this.server.supplyPossiblyAsync(() -> func.apply(x)))
.reduce((x, y) -> x.thenCombine(y, (r, l) -> Stream.concat(r.stream(), l.stream()).collect(Collectors.toList())))
.orElseGet(() -> CompletableFuture.completedFuture(List.of()));
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func)
{
return this.docs.values()
.stream()
.flatMap(x -> x.collectFromEachSectionState(func))
.reduce((x, y) -> x.thenCombine(y, (r, l) -> Stream.concat(r.stream(), l.stream()).collect(Collectors.toList())))
.orElseGet(() -> CompletableFuture.completedFuture(List.of()));
}

@Override
public void logInfo(String message)
{
Expand Down Expand Up @@ -247,6 +282,16 @@ public void forEachSectionState(Consumer<? super SectionState> consumer)
}
}

private <RESULT> Stream<CompletableFuture<List<RESULT>>> collectFromEachSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func)
{
List<LegendServerSectionState> currentSectionsStates = this.sectionStates;
if (currentSectionsStates != null)
{
return currentSectionsStates.stream().map(x -> this.globalState.server.supplyPossiblyAsync(() -> func.apply(this, x)));
}
return Stream.empty();
}

Integer getVersion()
{
return version;
Expand Down
Loading

0 comments on commit 424a183

Please sign in to comment.