Skip to content

Commit

Permalink
Preprocessing stage (#25)
Browse files Browse the repository at this point in the history
* Implement Preprocessing Stage
* Make specterutils DependencyResolutionProcessor and LintingProcessors implement the Preprocessors as well
  • Loading branch information
jwillp authored Sep 10, 2024
1 parent 3712982 commit f8fbd69
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 88 deletions.
50 changes: 32 additions & 18 deletions pkg/specter/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
StopAfterSourceLoadingStage RunMode = "stop-after-source-loading-stage"
StopAfterUnitLoadingStage RunMode = "stop-after-unit-loading-stage"
StopAfterUnitProcessingStage RunMode = "stop-after-unit-processing-stage"
StopAfterPreprocessingStage RunMode = "stop-after-preprocessing-stage"
)

const SourceLoadingFailedErrorCode = "specter.source_loading_failed"
const UnitLoadingFailedErrorCode = "specter.unit_loading_failed"
const UnitPreprocessingFailedErrorCode = "specter.unit_preprocessing_failed"
const UnitProcessingFailedErrorCode = "specter.unit_processing_failed"
const ArtifactProcessingFailedErrorCode = "specter.artifact_processing_failed"

Expand Down Expand Up @@ -57,53 +59,65 @@ type PipelineContextData struct {
}

type Pipeline interface {
Run(ctx context.Context, sourceLocations []string, runMode RunMode) (result PipelineResult, err error)
Run(ctx context.Context, runMode RunMode, sourceLocations []string) (PipelineResult, error)
}

type SourceLoadingStage interface {
Run(ctx PipelineContext, sourceLocations []string) ([]Source, error)
}

type UnitLoadingStage interface {
Run(ctx PipelineContext, sources []Source) ([]Unit, error)
Run(PipelineContext, []Source) ([]Unit, error)
}

type UnitPreprocessingStage interface {
Run(PipelineContext, []Unit) ([]Unit, error)
}

type UnitProcessingStage interface {
Run(ctx PipelineContext, units []Unit) ([]Artifact, error)
Run(PipelineContext, []Unit) ([]Artifact, error)
}

type ArtifactProcessingStage interface {
Run(ctx PipelineContext, artifacts []Artifact) error
Run(PipelineContext, []Artifact) error
}

type SourceLoadingStageHooks interface {
Before(ctx PipelineContext) error
After(ctx PipelineContext) error
Before(PipelineContext) error
After(PipelineContext) error
BeforeSourceLocation(ctx PipelineContext, sourceLocation string) error
AfterSourceLocation(ctx PipelineContext, sourceLocation string) error
OnError(ctx PipelineContext, err error) error
OnError(PipelineContext, error) error
}

type UnitLoadingStageHooks interface {
Before(ctx PipelineContext) error
After(ctx PipelineContext) error
BeforeSource(ctx PipelineContext, source Source) error
AfterSource(ctx PipelineContext, source Source) error
OnError(ctx PipelineContext, err error) error
Before(PipelineContext) error
After(PipelineContext) error
BeforeSource(PipelineContext, Source) error
AfterSource(PipelineContext, Source) error
OnError(PipelineContext, error) error
}

type UnitPreprocessingStageHooks interface {
Before(PipelineContext) error
After(PipelineContext) error
BeforePreprocessor(ctx PipelineContext, preprocessorName string) error
AfterPreprocessor(ctx PipelineContext, preprocessorName string) error
OnError(PipelineContext, error) error
}

type UnitProcessingStageHooks interface {
Before(ctx PipelineContext) error
After(ctx PipelineContext) error
Before(PipelineContext) error
After(PipelineContext) error
BeforeProcessor(ctx PipelineContext, processorName string) error
AfterProcessor(ctx PipelineContext, processorName string) error
OnError(ctx PipelineContext, err error) error
OnError(PipelineContext, error) error
}

type ArtifactProcessingStageHooks interface {
Before(ctx PipelineContext) error
After(ctx PipelineContext) error
Before(PipelineContext) error
After(PipelineContext) error
BeforeProcessor(ctx PipelineContext, processorName string) error
AfterProcessor(ctx PipelineContext, processorName string) error
OnError(ctx PipelineContext, err error) error
OnError(PipelineContext, error) error
}
175 changes: 134 additions & 41 deletions pkg/specter/pipelinedefault.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ type DefaultPipeline struct {

SourceLoadingStage SourceLoadingStage
UnitLoadingStage UnitLoadingStage
UnitPreprocessingStage UnitPreprocessingStage
UnitProcessingStage UnitProcessingStage
ArtifactProcessingStage ArtifactProcessingStage
}

// Run the DefaultPipeline from start to finish.
func (p DefaultPipeline) Run(ctx context.Context, sourceLocations []string, runMode RunMode) (PipelineResult, error) {
func (p DefaultPipeline) Run(ctx context.Context, runMode RunMode, sourceLocations []string) (PipelineResult, error) {
if runMode == "" {
runMode = RunThrough
}
Expand Down Expand Up @@ -70,6 +71,13 @@ func (p DefaultPipeline) run(pctx *PipelineContext, sourceLocations []string, ru
return nil
}

if err := p.runUnitPreprocessingStage(pctx); err != nil {
return err
}
if runMode == StopAfterPreprocessingStage {
return nil
}

if err := p.runUnitProcessingStage(pctx); err != nil {
return err
}
Expand All @@ -83,14 +91,14 @@ func (p DefaultPipeline) run(pctx *PipelineContext, sourceLocations []string, ru
return nil
}

func (p DefaultPipeline) runSourceLoadingStage(pctx *PipelineContext, sourceLocations []string) error {
if err := pctx.Err(); err != nil {
func (p DefaultPipeline) runSourceLoadingStage(ctx *PipelineContext, sourceLocations []string) error {
if err := ctx.Err(); err != nil {
return err
}

var err error
if p.SourceLoadingStage != nil {
pctx.Sources, err = p.SourceLoadingStage.Run(*pctx, sourceLocations)
ctx.Sources, err = p.SourceLoadingStage.Run(*ctx, sourceLocations)
if err != nil {
return errors.WrapWithMessage(err, SourceLoadingFailedErrorCode, "failed loading sources")
}
Expand All @@ -99,14 +107,14 @@ func (p DefaultPipeline) runSourceLoadingStage(pctx *PipelineContext, sourceLoca
return nil
}

func (p DefaultPipeline) runUnitLoadingStage(pctx *PipelineContext) error {
if err := pctx.Err(); err != nil {
func (p DefaultPipeline) runUnitLoadingStage(ctx *PipelineContext) error {
if err := ctx.Err(); err != nil {
return err
}

var err error
if p.UnitLoadingStage != nil {
pctx.Units, err = p.UnitLoadingStage.Run(*pctx, pctx.Sources)
ctx.Units, err = p.UnitLoadingStage.Run(*ctx, ctx.Sources)
if err != nil {
return errors.WrapWithMessage(err, UnitLoadingFailedErrorCode, "failed loading units")
}
Expand All @@ -115,47 +123,48 @@ func (p DefaultPipeline) runUnitLoadingStage(pctx *PipelineContext) error {
return nil
}

func (p DefaultPipeline) runUnitProcessingStage(pctx *PipelineContext) error {
if err := pctx.Err(); err != nil {
func (p DefaultPipeline) runUnitProcessingStage(ctx *PipelineContext) error {
if err := ctx.Err(); err != nil {
return err
}

var err error
if p.UnitProcessingStage != nil {
pctx.Artifacts, err = p.UnitProcessingStage.Run(*pctx, pctx.Units)
var err error
ctx.Artifacts, err = p.UnitProcessingStage.Run(*ctx, ctx.Units)
if err != nil {
return errors.WrapWithMessage(err, UnitProcessingFailedErrorCode, "failed processing units")
}
}
return nil
}

func (p DefaultPipeline) runArtifactProcessingStage(pctx *PipelineContext) error {
if err := pctx.Err(); err != nil {
func (p DefaultPipeline) runArtifactProcessingStage(ctx *PipelineContext) error {
if err := ctx.Err(); err != nil {
return err
}

if p.ArtifactProcessingStage != nil {
if err := p.ArtifactProcessingStage.Run(*pctx, pctx.Artifacts); err != nil {
if err := p.ArtifactProcessingStage.Run(*ctx, ctx.Artifacts); err != nil {
return errors.WrapWithMessage(err, ArtifactProcessingFailedErrorCode, "failed processing artifacts")
}
}
return nil
}

type SourceLoadingStageHooksAdapter struct{}
func (p DefaultPipeline) runUnitPreprocessingStage(ctx *PipelineContext) error {
if err := ctx.Err(); err != nil {
return err
}

func (_ SourceLoadingStageHooksAdapter) Before(_ PipelineContext) error { return nil }
func (_ SourceLoadingStageHooksAdapter) After(_ PipelineContext) error { return nil }
func (_ SourceLoadingStageHooksAdapter) BeforeSourceLocation(_ PipelineContext, _ string) error {
return nil
}
func (_ SourceLoadingStageHooksAdapter) AfterSourceLocation(_ PipelineContext, _ string) error {
if p.UnitPreprocessingStage != nil {
var err error
ctx.Units, err = p.UnitPreprocessingStage.Run(*ctx, ctx.Units)
if err != nil {
return errors.WrapWithMessage(err, UnitPreprocessingFailedErrorCode, "failed preprocessing units")
}
}
return nil
}
func (_ SourceLoadingStageHooksAdapter) OnError(_ PipelineContext, err error) error {
return err
}

type sourceLoadingStage struct {
SourceLoaders []SourceLoader
Expand Down Expand Up @@ -212,7 +221,7 @@ func (s sourceLoadingStage) run(ctx PipelineContext, sourceLocations []string) (
return ctx.Sources, errors.GroupOrNil(errs)
}

func (s sourceLoadingStage) processSourceLocation(ctx PipelineContext, sl string) ([]Source, error) {
func (s sourceLoadingStage) processSourceLocation(_ PipelineContext, sl string) ([]Source, error) {
var sources []Source
for _, l := range s.SourceLoaders {
if !l.Supports(sl) {
Expand All @@ -227,26 +236,19 @@ func (s sourceLoadingStage) processSourceLocation(ctx PipelineContext, sl string
return sources, nil
}

type UnitLoadingStageHooksAdapter struct{}

func (_ UnitLoadingStageHooksAdapter) Before(_ PipelineContext) error { return nil }
func (_ UnitLoadingStageHooksAdapter) After(_ PipelineContext) error { return nil }
func (_ UnitLoadingStageHooksAdapter) BeforeSource(_ PipelineContext, _ Source) error { return nil }
func (_ UnitLoadingStageHooksAdapter) AfterSource(_ PipelineContext, _ Source) error { return nil }
func (_ UnitLoadingStageHooksAdapter) OnError(_ PipelineContext, err error) error { return err }

type UnitProcessingStageHooksAdapter struct {
}
type SourceLoadingStageHooksAdapter struct{}

func (_ UnitProcessingStageHooksAdapter) Before(_ PipelineContext) error { return nil }
func (_ UnitProcessingStageHooksAdapter) After(_ PipelineContext) error { return nil }
func (_ UnitProcessingStageHooksAdapter) BeforeProcessor(_ PipelineContext, _ string) error {
func (_ SourceLoadingStageHooksAdapter) Before(_ PipelineContext) error { return nil }
func (_ SourceLoadingStageHooksAdapter) After(_ PipelineContext) error { return nil }
func (_ SourceLoadingStageHooksAdapter) BeforeSourceLocation(_ PipelineContext, _ string) error {
return nil
}
func (_ UnitProcessingStageHooksAdapter) AfterProcessor(_ PipelineContext, _ string) error {
func (_ SourceLoadingStageHooksAdapter) AfterSourceLocation(_ PipelineContext, _ string) error {
return nil
}
func (_ UnitProcessingStageHooksAdapter) OnError(_ PipelineContext, err error) error { return err }
func (_ SourceLoadingStageHooksAdapter) OnError(_ PipelineContext, err error) error {
return err
}

type unitLoadingStage struct {
Loaders []UnitLoader
Expand Down Expand Up @@ -314,6 +316,84 @@ func (s unitLoadingStage) runLoader(src Source) ([]Unit, error) {
return units, nil
}

type UnitLoadingStageHooksAdapter struct{}

func (_ UnitLoadingStageHooksAdapter) Before(_ PipelineContext) error { return nil }
func (_ UnitLoadingStageHooksAdapter) After(_ PipelineContext) error { return nil }
func (_ UnitLoadingStageHooksAdapter) BeforeSource(_ PipelineContext, _ Source) error { return nil }
func (_ UnitLoadingStageHooksAdapter) AfterSource(_ PipelineContext, _ Source) error { return nil }
func (_ UnitLoadingStageHooksAdapter) OnError(_ PipelineContext, err error) error { return err }

type unitPreprocessingStage struct {
Preprocessors []UnitPreprocessor
Hooks UnitPreprocessingStageHooks
}

func (s unitPreprocessingStage) Run(ctx PipelineContext, units []Unit) ([]Unit, error) {
if s.Hooks == nil {
s.Hooks = UnitPreprocessingStageHooksAdapter{}
}

if err := s.Hooks.Before(ctx); err != nil {
return nil, newFailedToRunHookErr(err, "Before")
}

units, err := s.run(ctx, units)
if err != nil {
return nil, s.Hooks.OnError(ctx, errors.WrapWithMessage(err, UnitProcessingFailedErrorCode, "failed preprocessing units"))
}

if err := s.Hooks.After(ctx); err != nil {
return nil, newFailedToRunHookErr(err, "After")
}

return units, nil
}

func (s unitPreprocessingStage) run(ctx PipelineContext, units []Unit) ([]Unit, error) {
var err error
for _, p := range s.Preprocessors {
if err := ctx.Err(); err != nil {
return nil, err
}

if err := s.Hooks.BeforePreprocessor(ctx, p.Name()); err != nil {
return nil, newFailedToRunHookErr(err, "BeforePreprocessor")
}

units, err = p.Preprocess(ctx, units)
if err != nil {
return nil, fmt.Errorf("preprocessor %q returned an error: %w", p.Name(), err)
}

units = append(ctx.Units, units...)
ctx.Units = units

if err := s.Hooks.AfterPreprocessor(ctx, p.Name()); err != nil {
return nil, newFailedToRunHookErr(err, "AfterPreprocessor")
}
}

return units, nil
}

type UnitPreprocessingStageHooksAdapter struct {
}

func (u UnitPreprocessingStageHooksAdapter) Before(PipelineContext) error { return nil }

func (u UnitPreprocessingStageHooksAdapter) After(PipelineContext) error { return nil }

func (u UnitPreprocessingStageHooksAdapter) BeforePreprocessor(PipelineContext, string) error {
return nil
}

func (u UnitPreprocessingStageHooksAdapter) AfterPreprocessor(PipelineContext, string) error {
return nil
}

func (u UnitPreprocessingStageHooksAdapter) OnError(_ PipelineContext, err error) error { return err }

type unitProcessingStage struct {
Processors []UnitProcessor
Hooks UnitProcessingStageHooks
Expand Down Expand Up @@ -356,7 +436,7 @@ func (s unitProcessingStage) run(ctx PipelineContext, units []Unit) ([]Artifact,
Artifacts: ctx.Artifacts,
})
if err != nil {
return nil, fmt.Errorf("processor %q returned an error :%w", processor.Name(), err)
return nil, fmt.Errorf("processor %q returned an error: %w", processor.Name(), err)
}

ctx.Artifacts = append(ctx.Artifacts, artifacts...)
Expand All @@ -369,6 +449,19 @@ func (s unitProcessingStage) run(ctx PipelineContext, units []Unit) ([]Artifact,
return ctx.Artifacts, nil
}

type UnitProcessingStageHooksAdapter struct {
}

func (_ UnitProcessingStageHooksAdapter) Before(_ PipelineContext) error { return nil }
func (_ UnitProcessingStageHooksAdapter) After(_ PipelineContext) error { return nil }
func (_ UnitProcessingStageHooksAdapter) BeforeProcessor(_ PipelineContext, _ string) error {
return nil
}
func (_ UnitProcessingStageHooksAdapter) AfterProcessor(_ PipelineContext, _ string) error {
return nil
}
func (_ UnitProcessingStageHooksAdapter) OnError(_ PipelineContext, err error) error { return err }

type ArtifactProcessingStageHooksAdapter struct {
}

Expand Down
Loading

0 comments on commit f8fbd69

Please sign in to comment.