From ac212737febf252c667f4425f780163b243e4f3d Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 23 Feb 2024 13:56:33 -0800 Subject: [PATCH] set env in Task --- .../scala/org/apache/spark/executor/Executor.scala | 1 + .../scala/org/apache/spark/scheduler/Task.scala | 14 +++++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dae00a72285d4..0a3067c5a71e2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -589,6 +589,7 @@ private[spark] class Executor( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) + task.setEnv(env) // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1ecd185de557a..ba880d0e69aa0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -126,7 +126,7 @@ private[spark] abstract class Task[T]( new CallerContext( "TASK", - SparkEnv.get.conf.get(APP_CALLER_CONTEXT), + env.conf.get(APP_CALLER_CONTEXT), appId, appAttemptId, jobId, @@ -143,15 +143,14 @@ private[spark] abstract class Task[T]( try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask( - MemoryMode.OFF_HEAP) + env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) + env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the // future. - val memoryManager = SparkEnv.get.memoryManager + val memoryManager = env.memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { @@ -164,11 +163,16 @@ private[spark] abstract class Task[T]( } private var taskMemoryManager: TaskMemoryManager = _ + private var env: SparkEnv = _ def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = { this.taskMemoryManager = taskMemoryManager } + def setEnv(env: SparkEnv): Unit = { + this.env = env + } + def runTask(context: TaskContext): T def preferredLocations: Seq[TaskLocation] = Nil