diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml new file mode 100644 index 00000000..d2ef124d --- /dev/null +++ b/.github/workflows/snyk-issue.yml @@ -0,0 +1,29 @@ +name: Snyk Issue + +on: + schedule: + - cron: '* */12 * * *' + +concurrency: snyk-issue + +jobs: + whitesource: + runs-on: ubuntu-latest + steps: + - name: checkout action + uses: actions/checkout@v2 + with: + repository: snowflakedb/whitesource-actions + token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} + path: whitesource-actions + - name: set-env + run: echo "REPO=$(basename $GITHUB_REPOSITORY)" >> $GITHUB_ENV + - name: Jira Creation + uses: ./whitesource-actions/snyk-issue + with: + snyk_org: ${{ secrets.SNYK_ORG_ID }} + snyk_token: ${{ secrets.SNYK_GITHUB_INTEGRATION_TOKEN }} + jira_token: ${{ secrets.JIRA_TOKEN }} + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + diff --git a/.github/workflows/snyk-pr.yml b/.github/workflows/snyk-pr.yml new file mode 100644 index 00000000..02b5f173 --- /dev/null +++ b/.github/workflows/snyk-pr.yml @@ -0,0 +1,31 @@ +name: snyk-pr +on: + pull_request: + branches: + - master +jobs: + whitesource: + runs-on: ubuntu-latest + if: ${{ github.event.pull_request.user.login == 'sfc-gh-snyk-sca-sa' }} + steps: + - name: checkout + uses: actions/checkout@v2 + with: + ref: ${{ github.event.pull_request.head.ref }} + fetch-depth: 0 + + - name: checkout action + uses: actions/checkout@v2 + with: + repository: snowflakedb/whitesource-actions + token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} + path: whitesource-actions + + - name: PR + uses: ./whitesource-actions/snyk-pr + env: + PR_TITLE: ${{ github.event.pull_request.title }} + with: + jira_token: ${{ secrets.JIRA_TOKEN }} + gh_token: ${{ secrets.GITHUB_TOKEN }} + amend: false # true if you want the commit to be amended with the JIRA number diff --git a/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala b/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala index 3be25488..e7c6f67e 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala @@ -190,6 +190,13 @@ object Parameters { "internal_use_aws_region_url" ) + val PARAM_UNIQUE_KEYS: String = knownParam( + "unique.keys" + ) + val PARAM_INSERTED_COLUMNS: String = knownParam( + "inserted.columns" + ) + val DEFAULT_S3_MAX_FILE_SIZE: String = (10 * 1000 * 1000).toString val MIN_S3_MAX_FILE_SIZE = 1000000 @@ -577,6 +584,10 @@ object Parameters { parameters.get(PARAM_SF_ACCOUNT) } + def uniqueKeys: String = parameters.getOrElse(PARAM_UNIQUE_KEYS, "") + + def insertedColumns: String = parameters.getOrElse(PARAM_INSERTED_COLUMNS, "") + /** * Snowflake SSL on/off - "on" by default */ @@ -625,7 +636,6 @@ object Parameters { def nonProxyHosts: Option[String] = { parameters.get(PARAM_NON_PROXY_HOSTS) } - /** * Mapping OAuth and authenticator values */ diff --git a/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala b/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala index 4e2b1114..f5f840f0 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala @@ -518,6 +518,22 @@ private[snowflake] object DefaultJDBCWrapper extends JDBCWrapper { .execute(bindVariableEnabled)(connection) } + def mergeIntoTable(tempTable: String, + targetTable: String, + keys: Array[String], + insertedColumns: Array[String], + bindVariableEnabled: Boolean = true): Unit = { + val joinExpr = keys.map(key => s"$targetTable.$key=$tempTable.$key").mkString(" and ") + val updateExpr = insertedColumns.map(c => s"$targetTable.$c=$tempTable.$c").mkString(",") + val matchExpr = s"when matched then update set $updateExpr" + val insertedColumnStr = insertedColumns.mkString(",") + val insertExpr = insertedColumns.map(key => s"$tempTable.$key").mkString(",") + val notMatchExpr = s"when not matched then insert($insertedColumnStr)values($insertExpr)" + (ConstantString("merge into") + Identifier(targetTable) + "using" + + Identifier(tempTable) + "on" + joinExpr + matchExpr + notMatchExpr) + .execute(bindVariableEnabled)(connection) + } + def truncateTable(table: String, bindVariableEnabled: Boolean = true): Unit = (ConstantString("truncate") + table) diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala index 4e7757bc..ad639a39 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala @@ -418,18 +418,44 @@ private[io] object StageWriter { ) // Execute COPY INTO TABLE to load data - StageWriter.executeCopyIntoTable( - sqlContext, - conn, - schema, - saveMode, - params, - targetTable, - file, - tempStage, - format, - fileUploadResults) - + // update data by merging syntax + val keyStr: String = sqlContext.getConf("spark.sql.unique.keys", params.uniqueKeys) + val insertedColumnStr: String = sqlContext.getConf("spark.sql.inserted.columns", params.insertedColumns) + if(!tempTable.equals(targetTable) && !keyStr.isEmpty && !insertedColumnStr.isEmpty){ + conn.createTable(tempTable.name, schema, params, + overwrite = true, temporary = true) + StageWriter.executeCopyIntoTable( + sqlContext, + conn, + schema, + saveMode, + params, + tempTable, + file, + tempStage, + format, + fileUploadResults + ) + val keys = keyStr.split(',') + val insertedColumns = insertedColumnStr.split(',') + conn.mergeIntoTable(tempTable.name, + targetTable.name, + keys, + insertedColumns) + } else { + StageWriter.executeCopyIntoTable( + sqlContext, + conn, + schema, + saveMode, + params, + targetTable, + file, + tempStage, + format, + fileUploadResults + ) + } // post actions Utils.executePostActions( DefaultJDBCWrapper,