Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Adding running on k8s.md #552

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
fb3636b
[SPARK-22807][SCHEDULER] Remove config that says docker and replace w…
foxish Dec 18, 2017
772e464
[SPARK-20653][CORE] Add cleaning of old elements from the status store.
Dec 18, 2017
fbfa9be
Revert "Revert "[SPARK-22496][SQL] thrift server adds operation logs""
HyukjinKwon Dec 18, 2017
3a07eff
[SPARK-22813][BUILD] Use lsof or /usr/sbin/lsof in run-tests.py
kiszk Dec 18, 2017
0609dcc
[SPARK-22777][SCHEDULER] Kubernetes mode dockerfile permission and di…
foxish Dec 18, 2017
d4e6959
[MINOR][SQL] Remove Useless zipWithIndex from ResolveAliases
gatorsmile Dec 19, 2017
ab7346f
[SPARK-22673][SQL] InMemoryRelation should utilize existing stats whe…
CodingCat Dec 19, 2017
571aa27
[SPARK-21984][SQL] Join estimation based on equi-height histogram
wzhfy Dec 19, 2017
2831571
[SPARK-22791][SQL][SS] Redact Output of Explain
gatorsmile Dec 19, 2017
b779c93
[SPARK-22815][SQL] Keep PromotePrecision in Optimized Plans
gatorsmile Dec 19, 2017
ee56fc3
[SPARK-18016][SQL] Code Generation: Constant Pool Limit - reduce entr…
kiszk Dec 19, 2017
ef10f45
[SPARK-21652][SQL][FOLLOW-UP] Fix rule conflict caused by InferFilter…
gatorsmile Dec 19, 2017
6129ffa
[SPARK-22821][TEST] Basic tests for WidenSetOperationTypes, BooleanEq…
wangyum Dec 19, 2017
3a7494d
[SPARK-22827][CORE] Avoid throwing OutOfMemoryError in case of except…
Dec 20, 2017
6e36d8d
[SPARK-22829] Add new built-in function date_trunc()
youngbink Dec 20, 2017
13268a5
[SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dataset API
ferdonline Dec 20, 2017
9962390
[SPARK-22781][SS] Support creating streaming dataset with ORC files
dongjoon-hyun Dec 20, 2017
2720c88
Adding running on k8s.md
foxish Nov 10, 2017
4ccf59b
Address comments
foxish Dec 12, 2017
14bee00
Adding links to running-on-kubernetes.md
foxish Dec 12, 2017
b18d1ba
Addressed comments (round 2)
foxish Dec 13, 2017
9594462
review comments
foxish Dec 13, 2017
679b5c7
Update to --name
foxish Dec 13, 2017
67abb93
Review comments
foxish Dec 14, 2017
a7e0c4c
Update the image building script
foxish Dec 20, 2017
74ac5c9
Add path to dockerfiles
foxish Dec 20, 2017
d235847
Test and update various options
foxish Dec 20, 2017
702162b
Add <code>..</code> blocks
foxish Dec 20, 2017
8726154
Update instructions for spark.app.name, modify config type
foxish Dec 21, 2017
374ddc8
Better line breaks
foxish Dec 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void spill() throws IOException {
* `LongArray` is too large to fit in a single page. The caller side should take care of these
* two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
*
* @throws OutOfMemoryError
* @throws SparkOutOfMemoryError
* @throws TooLargePageException
*/
public LongArray allocateArray(long size) {
Expand Down Expand Up @@ -154,6 +154,6 @@ private void throwOom(final MemoryBlock page, final long required) {
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.memory;

import org.apache.spark.annotation.Private;

/**
* This exception is thrown when a task can not acquire memory from the Memory manager.
* Instead of throwing {@link OutOfMemoryError}, which kills the executor,
* we should use throw this exception, which just kills the current task.
*/
@Private
public final class SparkOutOfMemoryError extends OutOfMemoryError {

public SparkOutOfMemoryError(String s) {
super(s);
}

public SparkOutOfMemoryError(OutOfMemoryError e) {
super(e.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
}
}
Expand All @@ -213,7 +213,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
throw new OutOfMemoryError("error while calling spill() on " + consumer + " : "
throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
+ e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TooLargePageException;
import org.apache.spark.serializer.DummySerializerInstance;
Expand Down Expand Up @@ -337,7 +338,7 @@ private void growPointerArrayIfNecessary() throws IOException {
// The pointer array is too big to fix in a single page, spill.
spill();
return;
} catch (OutOfMemoryError e) {
} catch (SparkOutOfMemoryError e) {
// should have trigger spilling
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.error("Unable to grow the pointer array");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -349,7 +350,7 @@ private void growPointerArrayIfNecessary() throws IOException {
// The pointer array is too big to fix in a single page, spill.
spill();
return;
} catch (OutOfMemoryError e) {
} catch (SparkOutOfMemoryError e) {
// should have trigger spilling
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.error("Unable to grow the pointer array");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.spark.TaskContext;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
Expand Down Expand Up @@ -212,7 +213,7 @@ public boolean hasSpaceForAnotherRecord() {

public void expandPointerArray(LongArray newArray) {
if (newArray.size() < array.size()) {
throw new OutOfMemoryError("Not enough memory to grow pointer array");
throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
}
Platform.copyMemory(
array.getBaseObject(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
Expand Down Expand Up @@ -304,6 +305,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
Expand All @@ -318,24 +322,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true)
}

val trackingStore = new ElementTrackingStore(kvstore, conf)
if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
}
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.flush()
trackingStore.close(false)
} catch {
case e: Exception =>
try {
kvstore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
import org.apache.spark.shuffle.FetchFailedException
Expand Down Expand Up @@ -553,10 +553,9 @@ private[spark] class Executor(

// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) {
if (!t.isInstanceOf[SparkOutOfMemoryError] && Utils.isFatalError(t)) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}

} finally {
runningTasks.remove(taskId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ package object config {
.stringConf
.createOptional

// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)

// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
Expand Down
Loading