diff --git a/src/it/scala/net/snowflake/spark/snowflake/io/StageSuite.scala b/src/it/scala/net/snowflake/spark/snowflake/io/StageSuite.scala index 1a310c75..e1d80cf3 100644 --- a/src/it/scala/net/snowflake/spark/snowflake/io/StageSuite.scala +++ b/src/it/scala/net/snowflake/spark/snowflake/io/StageSuite.scala @@ -311,16 +311,13 @@ class StageSuite extends IntegrationSuiteBase { try { // The credential for the external stage is fake. val azureExternalStage = ExternalAzureStorage( + param, containerName = "test_fake_container", azureAccount = "test_fake_account", azureEndpoint = "blob.core.windows.net", azureSAS = "?sig=test_test_test_test_test_test_test_test_test_test_test_test" + "_test_test_test_test_test_fak&spr=https&sp=rwdl&sr=c", - param.proxyInfo, - param.maxRetryCount, - param.sfURL, - param.useExponentialBackoff, param.expectedPartitionCount, pref = "test_dir", connection = connection @@ -367,13 +364,10 @@ class StageSuite extends IntegrationSuiteBase { try { // The credential for the external stage is fake. val s3ExternalStage = ExternalS3Storage( + param, bucketName = "test_fake_bucket", awsId = "TEST_TEST_TEST_TEST1", awsKey = "TEST_TEST_TEST_TEST_TEST_TEST_TEST_TEST2", - param.proxyInfo, - param.maxRetryCount, - param.sfURL, - param.useExponentialBackoff, param.expectedPartitionCount, pref = "test_dir", connection = connection, @@ -487,13 +481,10 @@ class StageSuite extends IntegrationSuiteBase { try { // The credential for the external stage is fake. val s3ExternalStage = ExternalS3Storage( + param, bucketName = "test_fake_bucket", awsId = "TEST_TEST_TEST_TEST1", awsKey = "TEST_TEST_TEST_TEST_TEST_TEST_TEST_TEST2", - param.proxyInfo, - param.maxRetryCount, - param.sfURL, - param.useExponentialBackoff, param.expectedPartitionCount, pref = "test_dir", connection = connection, diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala b/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala index 23f31b93..bfc3c83a 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala @@ -290,14 +290,11 @@ object CloudStorageOperations { ( ExternalAzureStorage( + param = param, containerName = container, azureAccount = account, azureEndpoint = endpoint, azureSAS = azureSAS, - param.proxyInfo, - param.maxRetryCount, - param.sfURL, - param.useExponentialBackoff, param.expectedPartitionCount, pref = path, connection = conn @@ -321,13 +318,10 @@ object CloudStorageOperations { ( ExternalS3Storage( + param = param, bucketName = bucket, awsId = param.awsAccessKey.get, awsKey = param.awsSecretKey.get, - param.proxyInfo, - param.maxRetryCount, - param.sfURL, - param.useExponentialBackoff, param.expectedPartitionCount, pref = prefix, connection = conn, @@ -495,14 +489,15 @@ private[io] object StorageInfo { } sealed trait CloudStorage { + protected val param: MergedParameters protected val RETRY_SLEEP_TIME_UNIT_IN_MS: Int = 1500 protected val MAX_SLEEP_TIME_IN_MS: Int = 3 * 60 * 1000 private var processedFileCount = 0 protected val connection: ServerConnection - protected val maxRetryCount: Int - protected val proxyInfo: Option[ProxyInfo] - protected val sfURL: String - protected val useExponentialBackoff: Boolean + protected val maxRetryCount: Int = param.maxRetryCount + protected val proxyInfo: Option[ProxyInfo] = param.proxyInfo + protected val sfURL: String = param.sfURL + protected val useExponentialBackoff: Boolean = param.useExponentialBackoff // The first 10 sleep time in second will be like // 3, 6, 12, 24, 48, 96, 192, 300, 300, 300, etc @@ -921,15 +916,11 @@ sealed trait CloudStorage { def fileExists(fileName: String): Boolean } -case class InternalAzureStorage(param: MergedParameters, +case class InternalAzureStorage(override protected val param: MergedParameters, stageName: String, @transient override val connection: ServerConnection) extends CloudStorage { - override val maxRetryCount = param.maxRetryCount - override val proxyInfo: Option[ProxyInfo] = param.proxyInfo - override val sfURL = param.sfURL - override val useExponentialBackoff = param.useExponentialBackoff override protected def getStageInfo( isWrite: Boolean, @@ -1150,14 +1141,11 @@ case class InternalAzureStorage(param: MergedParameters, } } -case class ExternalAzureStorage(containerName: String, +case class ExternalAzureStorage(override protected val param: MergedParameters, + containerName: String, azureAccount: String, azureEndpoint: String, azureSAS: String, - override val proxyInfo: Option[ProxyInfo], - override val maxRetryCount: Int, - override val sfURL: String, - override val useExponentialBackoff: Boolean, fileCountPerPartition: Int, pref: String = "", @transient override val connection: ServerConnection) @@ -1302,16 +1290,12 @@ case class ExternalAzureStorage(containerName: String, } } -case class InternalS3Storage(param: MergedParameters, +case class InternalS3Storage(override protected val param: MergedParameters, stageName: String, @transient override val connection: ServerConnection, parallelism: Int = CloudStorageOperations.DEFAULT_PARALLELISM) extends CloudStorage { - override val maxRetryCount = param.maxRetryCount - override val proxyInfo: Option[ProxyInfo] = param.proxyInfo - override val sfURL = param.sfURL - override val useExponentialBackoff = param.useExponentialBackoff override protected def getStageInfo( isWrite: Boolean, @@ -1550,13 +1534,10 @@ case class InternalS3Storage(param: MergedParameters, } } -case class ExternalS3Storage(bucketName: String, +case class ExternalS3Storage(override protected val param: MergedParameters, + bucketName: String, awsId: String, awsKey: String, - override val proxyInfo: Option[ProxyInfo], - override val maxRetryCount: Int, - override val sfURL: String, - override val useExponentialBackoff: Boolean, fileCountPerPartition: Int, awsToken: Option[String] = None, pref: String = "", @@ -1704,18 +1685,12 @@ case class ExternalS3Storage(bucketName: String, // Internal CloudStorage for GCS (Google Cloud Storage). // NOTE: External storage for GCS is not supported. -case class InternalGcsStorage(param: MergedParameters, +case class InternalGcsStorage(override protected val param: MergedParameters, stageName: String, @transient override val connection: ServerConnection, @transient stageManager: SFInternalStage) extends CloudStorage { - override val proxyInfo: Option[ProxyInfo] = param.proxyInfo - // Max retry count to upload a file - override val maxRetryCount: Int = param.maxRetryCount - override val sfURL = param.sfURL - override val useExponentialBackoff = param.useExponentialBackoff - // Generate file transfer metadata objects for file upload. On GCS, // the file transfer metadata is pre-signed URL and related metadata. // This function is called on Master node.