Skip to content

Commit

Permalink
[SPARK-12122][STREAMING] Prevent batches from being submitted twice a…
Browse files Browse the repository at this point in the history
…fter recovering StreamingContext from checkpoint

Author: Tathagata Das <[email protected]>

Closes apache#10127 from tdas/SPARK-12122.
  • Loading branch information
tdas committed Dec 4, 2015
1 parent 5011f26 commit 4106d80
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
.distinct.sorted(Time.ordering)
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach { time =>
Expand Down

0 comments on commit 4106d80

Please sign in to comment.