Skip to content

Commit

Permalink
[SPARK-13848][SPARK-5185] Update to Py4J 0.9.2 in order to fix classl…
Browse files Browse the repository at this point in the history
…oading issue

This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark.

In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / apache#4779 and other patches.

Py4J diff: py4j/py4j@0.9.1...0.9.2

/cc zsxwing tdas davies brkyvz

Author: Josh Rosen <[email protected]>

Closes apache#11687 from JoshRosen/py4j-0.9.2.
  • Loading branch information
JoshRosen committed Mar 14, 2016
1 parent 6a4bfcd commit 07cb323
Show file tree
Hide file tree
Showing 21 changed files with 38 additions and 65 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.2 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.2-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9.1</version>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.2-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ object KafkaUtils {
/**
* This is a helper class that wraps the KafkaUtils.createStream() into more
* Python-friendly class and function so that it can be easily
* instantiated and called from Python's KafkaUtils (see SPARK-6027).
* instantiated and called from Python's KafkaUtils.
*
* The zero-arg constructor helps instantiate this class from the Class object
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
Expand Down
2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.2-src.zip)

# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
Expand Down
Binary file not shown.
9 changes: 3 additions & 6 deletions python/pyspark/streaming/flume.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ def func(event):
@staticmethod
def _get_helper(sc):
try:
helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
return helperClass.newInstance()
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
FlumeUtils._printErrorMsg(sc)
raise

Expand Down
10 changes: 3 additions & 7 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,9 @@ def funcWithMessageHandler(m):
@staticmethod
def _get_helper(sc):
try:
# Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
return helperClass.newInstance()
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
KafkaUtils._printErrorMsg(sc)
raise

Expand Down
14 changes: 6 additions & 8 deletions python/pyspark/streaming/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,14 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,

try:
# Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
awsAccessKeyId, awsSecretKey)
except Py4JJavaError as e:
if 'ClassNotFoundException' in str(e.java_exception):
helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
KinesisUtils._printErrorMsg(ssc.sparkContext)
raise
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
awsAccessKeyId, awsSecretKey)
stream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))

Expand Down
13 changes: 5 additions & 8 deletions python/pyspark/streaming/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,15 @@ def createStream(ssc, brokerUrl, topic,
:param storageLevel: RDD storage level.
:return: A DStream object
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

try:
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
except Py4JJavaError as e:
if 'ClassNotFoundException' in str(e.java_exception):
helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
MQTTUtils._printErrorMsg(ssc.sparkContext)
raise

jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
return DStream(jstream, ssc, UTF8Deserializer())

@staticmethod
Expand Down
25 changes: 5 additions & 20 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):

def setUp(self):
super(KafkaStreamTests, self).setUp()

kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils()
self._kafkaTestUtils.setup()

def tearDown(self):
Expand Down Expand Up @@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase):

def setUp(self):
super(FlumeStreamTests, self).setUp()

utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
self._utils = utilsClz.newInstance()
self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()

def tearDown(self):
if self._utils is not None:
Expand Down Expand Up @@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
maxAttempts = 5

def setUp(self):
utilsClz = \
self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
self._utils = utilsClz.newInstance()
self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()

def tearDown(self):
if self._utils is not None:
Expand Down Expand Up @@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase):

def setUp(self):
super(MQTTStreamTests, self).setUp()

MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils()
self._MQTTTestUtils.setup()

def tearDown(self):
Expand Down Expand Up @@ -1498,10 +1486,7 @@ def test_kinesis_stream(self):

import random
kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
kinesisTestUtilsClz = \
self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
kinesisTestUtils = kinesisTestUtilsClz.newInstance()
kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils()
try:
kinesisTestUtils.createStream()
aWSCredentials = kinesisTestUtils.getAWSCredentials()
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ fi
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:${PYTHONPATH}"
4 changes: 2 additions & 2 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1087,9 +1087,9 @@ private[spark] class Client(
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
val py4jFile = new File(pyLibPath, "py4j-0.9.2-src.zip")
require(py4jFile.exists(),
"py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
"py4j-0.9.2-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
// needed locations.
val sparkHome = sys.props("spark.test.home")
val pythonPath = Seq(
s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
s"$sparkHome/python/lib/py4j-0.9.2-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
Expand Down

0 comments on commit 07cb323

Please sign in to comment.