Skip to content

Commit

Permalink
Fix python build.
Browse files Browse the repository at this point in the history
  • Loading branch information
milos.colic committed Feb 26, 2024
1 parent ff07073 commit 6d3bce6
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 8 deletions.
1 change: 1 addition & 0 deletions python/test/utils/spark_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def setUpClass(cls) -> None:
.getOrCreate()
)
cls.spark.conf.set("spark.databricks.labs.mosaic.jar.autoattach", "false")
cls.spark.conf.set("spark.databricks.labs.mosaic.raster.tmp.prefix", "/")
cls.spark.sparkContext.setLogLevel("FATAL")

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,10 +1025,20 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends

object MosaicContext extends Logging {

val tmpDir: String = FileUtils.createMosaicTempDir()
var _tmpDir: String = ""
val mosaicVersion: String = "0.4.0"

private var instance: Option[MosaicContext] = None

def tmpDir(mosaicConfig: MosaicExpressionConfig): String = {
if (_tmpDir == "" || mosaicConfig == null) {
val prefix = mosaicConfig.getTmpPrefix
_tmpDir = FileUtils.createMosaicTempDir(prefix)
_tmpDir
} else {
_tmpDir
}
}

def build(indexSystem: IndexSystem, geometryAPI: GeometryAPI): MosaicContext = {
instance = Some(new MosaicContext(indexSystem, geometryAPI))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ case class MosaicExpressionConfig(configs: Map[String, String]) {
def getCellIdType: DataType = IndexSystemFactory.getIndexSystem(getIndexSystem).cellIdType

def getRasterBlockSize: Int = configs.getOrElse(MOSAIC_RASTER_BLOCKSIZE, MOSAIC_RASTER_BLOCKSIZE_DEFAULT).toInt

def getTmpPrefix: String = configs.getOrElse(MOSAIC_RASTER_TMP_PREFIX, "/tmp")

def setGDALConf(conf: RuntimeConfig): MosaicExpressionConfig = {
val toAdd = conf.getAll.filter(_._1.startsWith(MOSAIC_GDAL_PREFIX))
Expand All @@ -56,6 +58,10 @@ case class MosaicExpressionConfig(configs: Map[String, String]) {
def setRasterCheckpoint(checkpoint: String): MosaicExpressionConfig = {
MosaicExpressionConfig(configs + (MOSAIC_RASTER_CHECKPOINT -> checkpoint))
}

def setTmpPrefix(prefix: String): MosaicExpressionConfig = {
MosaicExpressionConfig(configs + (MOSAIC_RASTER_TMP_PREFIX -> prefix))
}

def setConfig(key: String, value: String): MosaicExpressionConfig = {
MosaicExpressionConfig(configs + (key -> value))
Expand All @@ -75,6 +81,7 @@ object MosaicExpressionConfig {
.setGeometryAPI(spark.conf.get(MOSAIC_GEOMETRY_API, JTS.name))
.setIndexSystem(spark.conf.get(MOSAIC_INDEX_SYSTEM, H3.name))
.setRasterCheckpoint(spark.conf.get(MOSAIC_RASTER_CHECKPOINT, MOSAIC_RASTER_CHECKPOINT_DEFAULT))
.setTmpPrefix(spark.conf.get(MOSAIC_RASTER_TMP_PREFIX, "/tmp"))
.setGDALConf(spark.conf)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ object MosaicGDAL extends Logging {

/** Configures the GDAL environment. */
def configureGDAL(mosaicConfig: MosaicExpressionConfig): Unit = {
val CPL_TMPDIR = MosaicContext.tmpDir
val GDAL_PAM_PROXY_DIR = MosaicContext.tmpDir
val CPL_TMPDIR = MosaicContext.tmpDir(mosaicConfig)
val GDAL_PAM_PROXY_DIR = MosaicContext.tmpDir(mosaicConfig)
gdal.SetConfigOption("GDAL_VRT_ENABLE_PYTHON", "YES")
gdal.SetConfigOption("GDAL_DISABLE_READDIR_ON_OPEN", "TRUE")
gdal.SetConfigOption("CPL_TMPDIR", CPL_TMPDIR)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/databricks/labs/mosaic/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package object mosaic {
val MOSAIC_GDAL_NATIVE = "spark.databricks.labs.mosaic.gdal.native"
val MOSAIC_RASTER_CHECKPOINT = "spark.databricks.labs.mosaic.raster.checkpoint"
val MOSAIC_RASTER_CHECKPOINT_DEFAULT = "/dbfs/tmp/mosaic/raster/checkpoint"
val MOSAIC_RASTER_TMP_PREFIX = "spark.databricks.labs.mosaic.raster.tmp.prefix"
val MOSAIC_RASTER_BLOCKSIZE = "spark.databricks.labs.mosaic.raster.blocksize"
val MOSAIC_RASTER_BLOCKSIZE_DEFAULT = "128"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object PathUtils {
}

def createTmpFilePath(extension: String): String = {
val tmpDir = MosaicContext.tmpDir
val tmpDir = MosaicContext.tmpDir(null)
val uuid = java.util.UUID.randomUUID.toString
val outPath = s"$tmpDir/raster_${uuid.replace("-", "_")}.$extension"
Files.createDirectories(Paths.get(outPath).getParent)
Expand Down Expand Up @@ -80,9 +80,9 @@ object PathUtils {
val fullFileName = copyFromPath.split("/").last
val stemRegex = getStemRegex(inPath)

wildcardCopy(inPathDir, MosaicContext.tmpDir, stemRegex.toString)
wildcardCopy(inPathDir, MosaicContext.tmpDir(null), stemRegex.toString)

s"${MosaicContext.tmpDir}/$fullFileName"
s"${MosaicContext.tmpDir(null)}/$fullFileName"
}

def wildcardCopy(inDirPath: String, outDirPath: String, pattern: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait SpatialKNNBehaviors { this: AnyFlatSpec =>

val boroughs: DataFrame = getBoroughs(mc)

val tempLocation = MosaicContext.tmpDir
val tempLocation = MosaicContext.tmpDir(null)
spark.sparkContext.setCheckpointDir(tempLocation)
spark.sparkContext.setLogLevel("ERROR")

Expand Down Expand Up @@ -94,7 +94,7 @@ trait SpatialKNNBehaviors { this: AnyFlatSpec =>

val boroughs: DataFrame = getBoroughs(mc)

val tempLocation = MosaicContext.tmpDir
val tempLocation = MosaicContext.tmpDir(null)
spark.sparkContext.setCheckpointDir(tempLocation)
spark.sparkContext.setLogLevel("ERROR")

Expand Down

0 comments on commit 6d3bce6

Please sign in to comment.