Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Catch null JDBC connection properties #569

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ private[snowflake] object ServerConnection {
serverConnection
}

private def putProperty(jdbcProperties: Properties, key: Object, value: Object): Unit = {
if (value != null) {
jdbcProperties.put(key, value)
} else {
throw new IllegalArgumentException(s"Invalid JDBC property: ${key.toString}")
}
}

private def createJDBCConnection(params: MergedParameters): Connection = {
// Derive class name
val driverClassName = JDBC_DRIVER
Expand All @@ -155,42 +163,42 @@ private[snowflake] object ServerConnection {
val jdbcProperties = new Properties()

// Obligatory properties
jdbcProperties.put("db", params.sfDatabase)
jdbcProperties.put("schema", params.sfSchema) // Has a default
putProperty(jdbcProperties, "db", params.sfDatabase)
putProperty(jdbcProperties, "schema", params.sfSchema) // Has a default
if (params.sfUser != null) {
// user is optional when using Oauth token
jdbcProperties.put("user", params.sfUser)
}
params.privateKey match {
case Some(privateKey) =>
jdbcProperties.put("privateKey", privateKey)
putProperty(jdbcProperties, "privateKey", privateKey)
case None =>
// Adding OAuth Token parameter
params.sfToken match {
case Some(value) =>
jdbcProperties.put("token", value)
case None => jdbcProperties.put("password", params.sfPassword)
putProperty(jdbcProperties, "token", value)
case None => putProperty(jdbcProperties, "password", params.sfPassword)
}
}
jdbcProperties.put("ssl", params.sfSSL) // Has a default
putProperty(jdbcProperties, "ssl", params.sfSSL) // Has a default
// Optional properties
if (params.sfAccount.isDefined) {
jdbcProperties.put("account", params.sfAccount.get)
putProperty(jdbcProperties, "account", params.sfAccount.get)
}
if (params.sfWarehouse.isDefined) {
jdbcProperties.put("warehouse", params.sfWarehouse.get)
putProperty(jdbcProperties, "warehouse", params.sfWarehouse.get)
}
if (params.sfRole.isDefined) {
jdbcProperties.put("role", params.sfRole.get)
putProperty(jdbcProperties, "role", params.sfRole.get)
}
params.getTimeOutputFormat match {
case Some(value) =>
jdbcProperties.put(Parameters.PARAM_TIME_OUTPUT_FORMAT, value)
putProperty(jdbcProperties, Parameters.PARAM_TIME_OUTPUT_FORMAT, value)
case _ => // No default value for it.
}
params.getQueryResultFormat match {
case Some(value) =>
jdbcProperties.put(Parameters.PARAM_JDBC_QUERY_RESULT_FORMAT, value)
putProperty(jdbcProperties, Parameters.PARAM_JDBC_QUERY_RESULT_FORMAT, value)
case _ => // No default value for it.
}

Expand All @@ -200,25 +208,25 @@ private[snowflake] object ServerConnection {
// Adding Authenticator parameter
params.sfAuthenticator match {
case Some(value) =>
jdbcProperties.put("authenticator", value)
putProperty(jdbcProperties, "authenticator", value)
case _ => // No default value for it.
}

// Always set CLIENT_SESSION_KEEP_ALIVE.
// Note, can be overridden with options
jdbcProperties.put("client_session_keep_alive", "true")
putProperty(jdbcProperties, "client_session_keep_alive", "true")

// Force DECIMAL for NUMBER (SNOW-33227)
if (params.treatDecimalAsLong) {
jdbcProperties.put("JDBC_TREAT_DECIMAL_AS_INT", "true")
putProperty(jdbcProperties, "JDBC_TREAT_DECIMAL_AS_INT", "true")
} else {
jdbcProperties.put("JDBC_TREAT_DECIMAL_AS_INT", "false")
putProperty(jdbcProperties, "JDBC_TREAT_DECIMAL_AS_INT", "false")
}

// Add extra properties from sfOptions
val extraOptions = params.sfExtraOptions
for ((k: String, v: Object) <- extraOptions) {
jdbcProperties.put(k.toLowerCase, v.toString)
putProperty(jdbcProperties, k.toLowerCase, v.toString)
}

// Only one JDBC version is certified for each spark connector.
Expand All @@ -234,7 +242,8 @@ private[snowflake] object ServerConnection {
// For more details, refer to PROPERTY_NAME_OF_CONNECTOR_VERSION
// NOTE: From JDBC 3.13.3, the client info can be set with JDBC properties
// instead of system property: "snowflake.client.info".
jdbcProperties.put(SFSessionProperty.CLIENT_INFO.getPropertyKey, Utils.getClientInfoString())
putProperty(jdbcProperties, SFSessionProperty.CLIENT_INFO.getPropertyKey,
Utils.getClientInfoString())

val conn: Connection = DriverManager.getConnection(jdbcURL, jdbcProperties)

Expand Down