Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Add a test for ScatterGather showing failed deferred access with map #361

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

nh13
Copy link
Member

@nh13 nh13 commented Aug 22, 2019

This shows a bug when we use scatter.map(f).map(g) where the new task built in the second map (by g) requires that the task created in the first map (by f) has successfully been executed. If you look at groupBy or flatMap in SubScatter, you'll see that the defer execution of their method until previous tasks have completed. If you look at SecondarySubScatter, you'll see the source of the behavior for .map.map:

    def chain(source: Source): Seq[Result] = {
      val task: Result = f(source)
      this.tasks += task
      this.tasksDependingOnThisTask.foreach(other => task ==> other)
      this.tasksDependedOn.foreach(other => other ==> task)
      subs.foreach { sub => sub.chain(task).foreach { s => task ==> s } }
      Seq(task)
    }

The chain method is recursively called on each tracked SubScatter (i.e. subs), after applying f to the source. For each of those subs, their f method (i.e. g) will be applied to the provided task, but the provided task may not have been executed. I think we want to have a wrapper that defers chaining until the source task has completed successfully. We also call subs.foreach(_.connect()) in SubScatter which relies on the chain already being called.

@nh13 nh13 requested a review from tfenne August 22, 2019 05:19
@nh13 nh13 added the bug label Aug 22, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants