Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: parallelize environment loading #393

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 89 additions & 48 deletions eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,14 @@ func evalEnvironment(
return nil, nil
}

ec := newEvalContext(ctx, validating, name, env, decrypter, providers, envs, map[string]*imported{}, execContext, showSecrets)
loader := newLoader(ctx, envs)
ec := newEvalContext(ctx, validating, name, env, decrypter, providers, loader, map[string]*imported{}, execContext, showSecrets)

diags := ec.load()
if diags.HasErrors() {
return nil, diags
}

v, diags := ec.evaluate()

s := schema.Never().Schema()
Expand All @@ -139,22 +146,23 @@ func evalEnvironment(
}

type imported struct {
evaluating bool
value *value
loading bool
ctx *evalContext
value *value
}

// An evalContext carries the state necessary to evaluate an environment.
type evalContext struct {
ctx context.Context // the cancellation context for evaluation
validating bool // true if we are only checking the environment
showSecrets bool // true if secrets should be decrypted during validation
name string // the name of the environment
env *ast.EnvironmentDecl // the root of the environment AST
decrypter Decrypter // the decrypter to use for the environment
providers ProviderLoader // the provider loader to use
environments EnvironmentLoader // the environment loader to use
imports map[string]*imported // the shared set of imported environments
execContext *esc.ExecContext // evaluation context used for interpolation
ctx context.Context // the cancellation context for evaluation
validating bool // true if we are only checking the environment
showSecrets bool // true if secrets should be decrypted during validation
name string // the name of the environment
env *ast.EnvironmentDecl // the root of the environment AST
decrypter Decrypter // the decrypter to use for the environment
providers ProviderLoader // the provider loader to use
loader *loader // the environment loader
imports map[string]*imported // the shared set of imported environments
execContext *esc.ExecContext // evaluation context used for interpolation

myContext *value // evaluated context to be used to interpolate properties
myImports *value // directly-imported environments
Expand All @@ -171,22 +179,22 @@ func newEvalContext(
env *ast.EnvironmentDecl,
decrypter Decrypter,
providers ProviderLoader,
environments EnvironmentLoader,
loader *loader,
imports map[string]*imported,
execContext *esc.ExecContext,
showSecrets bool,
) *evalContext {
return &evalContext{
ctx: ctx,
validating: validating,
showSecrets: showSecrets,
name: name,
env: env,
decrypter: decrypter,
providers: providers,
environments: environments,
imports: imports,
execContext: execContext.CopyForEnv(name),
ctx: ctx,
validating: validating,
showSecrets: showSecrets,
name: name,
env: env,
decrypter: decrypter,
providers: providers,
loader: loader,
imports: imports,
execContext: execContext.CopyForEnv(name),
}
}

Expand Down Expand Up @@ -349,6 +357,55 @@ func (e *evalContext) isReserveTopLevelKey(k string) bool {
}
}

func (e *evalContext) load() syntax.Diagnostics {
mine := &imported{loading: true, ctx: e}
defer func() { mine.loading = false }()
e.imports[e.name] = mine

loads := make([]*loadedEnvironment, len(e.env.Imports.GetElements()))
for i, entry := range e.env.Imports.GetElements() {
loads[i] = e.loadImport(entry)
}

for i, entry := range e.env.Imports.GetElements() {
l := loads[i]
if l == nil {
continue
}
<-l.done

e.diags.Extend(l.diags...)
if l.err != nil {
e.errorf(entry.Environment, "%s", l.err.Error())
continue
}

imp := newEvalContext(e.ctx, e.validating, l.name, l.env, l.dec, e.providers, e.loader, e.imports, e.execContext, e.showSecrets)
diags := imp.load()
e.diags.Extend(diags...)
}

return e.diags
}

func (e *evalContext) loadImport(decl *ast.ImportDecl) *loadedEnvironment {
// If the import does not have a name, there's nothing we can do. This can happen for environments
// with parse errors.
if decl.Environment == nil {
return nil
}
name := decl.Environment.Value

if imported, ok := e.imports[name]; ok {
if imported.loading {
e.diags.Extend(syntax.Error(decl.Syntax().Syntax().Range(), fmt.Sprintf("cyclic import of %v", name), decl.Syntax().Syntax().Path()))
}
return nil
}

return e.loader.load(name)
}

// evaluate drives the evaluation of the evalContext's environment.
func (e *evalContext) evaluate() (*value, syntax.Diagnostics) {
// Evaluate context. We prepare the context values to later evaluate interpolations.
Expand Down Expand Up @@ -393,10 +450,6 @@ func (e *evalContext) evaluateContext() {

// evaluateImports evaluates an environment's imports.
func (e *evalContext) evaluateImports() {
mine := &imported{evaluating: true}
defer func() { mine.evaluating = false }()
e.imports[e.name] = mine

myImports := map[string]*value{}
for _, entry := range e.env.Imports.GetElements() {
e.evaluateImport(myImports, entry)
Expand Down Expand Up @@ -432,34 +485,22 @@ func (e *evalContext) evaluateImport(myImports map[string]*value, decl *ast.Impo
}
name := decl.Environment.Value

imported, ok := e.imports[name]
if !ok {
e.diags.Extend(syntax.Error(decl.Syntax().Syntax().Range(), fmt.Sprintf("internal error: missing context for %v", name), decl.Syntax().Syntax().Path()))
return
}

merge := true
if decl.Meta != nil && decl.Meta.Merge != nil {
merge = decl.Meta.Merge.Value
}

var val *value
if imported, ok := e.imports[name]; ok {
if imported.evaluating {
e.diags.Extend(syntax.Error(decl.Syntax().Syntax().Range(), fmt.Sprintf("cyclic import of %v", name), decl.Syntax().Syntax().Path()))
return
}
if imported.value != nil {
val = imported.value
} else {
bytes, dec, err := e.environments.LoadEnvironment(e.ctx, name)
if err != nil {
e.errorf(decl.Environment, "%s", err.Error())
return
}

env, diags, err := LoadYAMLBytes(name, bytes)
e.diags.Extend(diags...)
if err != nil {
e.errorf(decl.Environment, "%s", err.Error())
return
}

imp := newEvalContext(e.ctx, e.validating, name, env, dec, e.providers, e.environments, e.imports, e.execContext, e.showSecrets)
v, diags := imp.evaluate()
v, diags := imported.ctx.evaluate()
e.diags.Extend(diags...)

val = v
Expand Down
2 changes: 2 additions & 0 deletions eval/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ func benchmarkEval(b *testing.B, openDelay, loadDelay time.Duration) {
envs, err := newBenchEnvironments(basePath, loadDelay)
require.NoError(b, err)

b.ResetTimer()

for i := 0; i < b.N; i++ {
execContext, err := esc.NewExecContext(map[string]esc.Value{
"pulumi": esc.NewValue(map[string]esc.Value{
Expand Down
71 changes: 71 additions & 0 deletions eval/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package eval

import (
"context"

"github.com/pulumi/esc/ast"
"github.com/pulumi/esc/syntax"
)

type loadedEnvironment struct {
done <-chan bool

name string
env *ast.EnvironmentDecl
dec Decrypter
diags syntax.Diagnostics
err error
}

type loader struct {
ctx context.Context
environments EnvironmentLoader
loaded map[string]*loadedEnvironment
}

func newLoader(ctx context.Context, environments EnvironmentLoader) *loader {
return &loader{
ctx: ctx,
environments: environments,
loaded: map[string]*loadedEnvironment{},
}
}

func (l *loader) load(name string) *loadedEnvironment {
if loaded, ok := l.loaded[name]; ok {
return loaded
}

done := make(chan bool)
result := &loadedEnvironment{done: done, name: name}
go func() {
defer close(done)

bytes, dec, err := l.environments.LoadEnvironment(l.ctx, name)
if err != nil {
result.err = err
return
}
result.dec = dec

result.env, result.diags, result.err = LoadYAMLBytes(name, bytes)
return

Check failure on line 66 in eval/loader.go

View workflow job for this annotation

GitHub Actions / lint / lint

S1023: redundant `return` statement (gosimple)
}()

l.loaded[name] = result
return result
}
Loading
Loading