Skip to content

Commit

Permalink
Use SessionState to load Hadoop conf (apache#642)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored and rdblue committed Nov 13, 2019
1 parent 96fb5ef commit 6f28abf
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions examples/Convert table to Iceberg.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
"{ // use a block to avoid values (conf, etc.) getting caught in closures\n",
"\n",
" // remove the temp table if it already exists\n",
" val conf = spark.sparkContext.hadoopConfiguration\n",
" val conf = spark.sessionState.newHadoopConf()\n",
" val fs = new Path(path).getFileSystem(conf)\n",
" fs.delete(new Path(path), true /* recursive */ )\n",
"\n",
Expand Down Expand Up @@ -338,7 +338,7 @@
}
],
"source": [
"val tables = new HadoopTables(spark.sparkContext.hadoopConfiguration)\n",
"val tables = new HadoopTables(spark.sessionState.newHadoopConf())\n",
"val table = tables.load(path)\n",
"\n",
"table.currentSnapshot"
Expand Down
2 changes: 1 addition & 1 deletion site/docs/api-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The Hive catalog connects to a Hive MetaStore to keep track of Iceberg tables. T
```scala
import org.apache.iceberg.hive.HiveCatalog

val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val catalog = new HiveCatalog(spark.sessionState.newHadoopConf())
```

The `Catalog` interface defines methods for working with tables, like `createTable`, `loadTable`, `renameTable`, and `dropTable`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void setupSpark() {
.config("spark.ui.enabled", false)
.master("local")
.getOrCreate();
Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration();
Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private SparkSession lazySparkSession() {

private Configuration lazyBaseConf() {
if (lazyConf == null) {
this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
}
return lazyConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void testNullableWithWriteOption() throws IOException {
.write().parquet(sourcePath);

// this is our iceberg dataset to which we will append data
new HadoopTables(spark.sparkContext().hadoopConfiguration())
new HadoopTables(spark.sessionState().newHadoopConf())
.create(
icebergSchema,
PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(),
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testNullableWithSparkSqlOption() throws IOException {
.getOrCreate();

// this is our iceberg dataset to which we will append data
new HadoopTables(newSparkSession.sparkContext().hadoopConfiguration())
new HadoopTables(newSparkSession.sessionState().newHadoopConf())
.create(
icebergSchema,
PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testNoWriteFormatOption() throws IOException {
@Test
public void testHadoopOptions() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration();
Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
String originalDefaultFS = sparkHadoopConf.get("fs.default.name");

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testNullPartitionValue() throws Exception {
File dataFolder = new File(location, "data");
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());

HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

Expand Down Expand Up @@ -160,7 +160,7 @@ public void testPartitionValueTypes() throws Exception {
"b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"
};

HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());

// create a table around the source data
String sourceLocation = temp.newFolder("source_table").toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testImportPartitionedTable() throws Exception {
.saveAsTable("test_partitioned_table");
TableIdentifier source = spark.sessionState().sqlParser()
.parseTableIdentifier("test_partitioned_table");
HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, qualifiedTableName),
SparkSchemaUtil.specForTable(spark, qualifiedTableName),
ImmutableMap.of(),
Expand All @@ -162,7 +162,7 @@ public void testImportUnpartitionedTable() throws Exception {
.saveAsTable("test_unpartitioned_table");
TableIdentifier source = spark.sessionState().sqlParser()
.parseTableIdentifier("test_unpartitioned_table");
HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, qualifiedTableName),
SparkSchemaUtil.specForTable(spark, qualifiedTableName),
ImmutableMap.of(),
Expand Down

0 comments on commit 6f28abf

Please sign in to comment.