From 9524d72505638e758c709e6976ae94937adf5643 Mon Sep 17 00:00:00 2001 From: sfc-gh-jfan Date: Wed, 27 Apr 2022 14:37:24 -0700 Subject: [PATCH 1/4] PRODSEC-1262 Whitesource Transition to Snyk --- .github/workflows/snyk-issue.yml | 25 +++++++++++++++++++++++++ .github/workflows/snyk-pr.yml | 29 +++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 .github/workflows/snyk-issue.yml create mode 100644 .github/workflows/snyk-pr.yml diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml new file mode 100644 index 00000000..b54275bf --- /dev/null +++ b/.github/workflows/snyk-issue.yml @@ -0,0 +1,25 @@ +name: Snyk Issue + +on: + issues: + types: [opened, reopened] + +concurrency: snyk-issue + +jobs: + whitesource: + runs-on: ubuntu-latest + if: ${{ github.event.issue.user.login == 'sfc-gh-snyk-sca-sa' }} + steps: + - name: checkout action + uses: actions/checkout@v2 + with: + repository: snowflakedb/whitesource-actions + token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} + path: whitesource-actions + + - name: Jira Creation + uses: ./whitesource-actions/snyk-issue + with: + jira_token: ${{ secrets.JIRA_TOKEN }} + gh_token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/snyk-pr.yml b/.github/workflows/snyk-pr.yml new file mode 100644 index 00000000..0fb62a81 --- /dev/null +++ b/.github/workflows/snyk-pr.yml @@ -0,0 +1,29 @@ +name: snyk-pr +on: + pull_request: + branches: + - master +jobs: + whitesource: + runs-on: ubuntu-latest + if: ${{ github.event.pull_request.user.login == 'sfc-gh-snyk-sca-sa' }} + steps: + - name: checkout + uses: actions/checkout@v2 + with: + ref: ${{ github.event.pull_request.head.ref }} + fetch-depth: 0 + + - name: checkout action + uses: actions/checkout@v2 + with: + repository: snowflakedb/whitesource-actions + token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} + path: whitesource-actions + + - name: PR + uses: ./whitesource-actions/snyk-pr + with: + jira_token: ${{ secrets.JIRA_TOKEN }} + gh_token: ${{ secrets.GITHUB_TOKEN }} + amend: false # true if you want the commit to be amended with the JIRA number \ No newline at end of file From b1a9a2b38930aea4379494e08c3a260166849b7d Mon Sep 17 00:00:00 2001 From: sfc-gh-jfan Date: Wed, 27 Apr 2022 16:21:01 -0700 Subject: [PATCH 2/4] minor update --- .github/workflows/snyk-issue.yml | 14 +++++++++----- .github/workflows/snyk-pr.yml | 4 +++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml index b54275bf..9641ba37 100644 --- a/.github/workflows/snyk-issue.yml +++ b/.github/workflows/snyk-issue.yml @@ -1,15 +1,14 @@ name: Snyk Issue on: - issues: - types: [opened, reopened] + schedule: + - cron: '* */12 * * *' concurrency: snyk-issue jobs: whitesource: runs-on: ubuntu-latest - if: ${{ github.event.issue.user.login == 'sfc-gh-snyk-sca-sa' }} steps: - name: checkout action uses: actions/checkout@v2 @@ -17,9 +16,14 @@ jobs: repository: snowflakedb/whitesource-actions token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} path: whitesource-actions - + - name: set-env + run: echo "REPO=$(basename $GITHUB_REPOSITORY)" >> $GITHUB_ENV - name: Jira Creation uses: ./whitesource-actions/snyk-issue with: + snyk_org: 91a1c3d0-0729-4a98-8210-28087460f1e3 + snyk_token: ${{ secrets.SNYK_GITHUB_INTEGRATION_TOKEN }} jira_token: ${{ secrets.JIRA_TOKEN }} - gh_token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + diff --git a/.github/workflows/snyk-pr.yml b/.github/workflows/snyk-pr.yml index 0fb62a81..02b5f173 100644 --- a/.github/workflows/snyk-pr.yml +++ b/.github/workflows/snyk-pr.yml @@ -23,7 +23,9 @@ jobs: - name: PR uses: ./whitesource-actions/snyk-pr + env: + PR_TITLE: ${{ github.event.pull_request.title }} with: jira_token: ${{ secrets.JIRA_TOKEN }} gh_token: ${{ secrets.GITHUB_TOKEN }} - amend: false # true if you want the commit to be amended with the JIRA number \ No newline at end of file + amend: false # true if you want the commit to be amended with the JIRA number From 4fd9f5f578c99ea9da3fd84c4e737e9841878ba0 Mon Sep 17 00:00:00 2001 From: sfc-gh-jfan Date: Wed, 4 May 2022 15:36:16 -0700 Subject: [PATCH 3/4] minor update --- .github/workflows/snyk-issue.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml index 9641ba37..d2ef124d 100644 --- a/.github/workflows/snyk-issue.yml +++ b/.github/workflows/snyk-issue.yml @@ -21,7 +21,7 @@ jobs: - name: Jira Creation uses: ./whitesource-actions/snyk-issue with: - snyk_org: 91a1c3d0-0729-4a98-8210-28087460f1e3 + snyk_org: ${{ secrets.SNYK_ORG_ID }} snyk_token: ${{ secrets.SNYK_GITHUB_INTEGRATION_TOKEN }} jira_token: ${{ secrets.JIRA_TOKEN }} env: From 7aa88594a62d6bb506aceaa4e5ee0b12ccfc1138 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Tue, 21 Jun 2022 11:14:03 +0800 Subject: [PATCH 4/4] Support merge into operation --- .../spark/snowflake/Parameters.scala | 12 ++++- .../snowflake/SnowflakeJDBCWrapper.scala | 16 ++++++ .../spark/snowflake/io/StageWriter.scala | 50 ++++++++++++++----- 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala b/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala index 3be25488..e7c6f67e 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala @@ -190,6 +190,13 @@ object Parameters { "internal_use_aws_region_url" ) + val PARAM_UNIQUE_KEYS: String = knownParam( + "unique.keys" + ) + val PARAM_INSERTED_COLUMNS: String = knownParam( + "inserted.columns" + ) + val DEFAULT_S3_MAX_FILE_SIZE: String = (10 * 1000 * 1000).toString val MIN_S3_MAX_FILE_SIZE = 1000000 @@ -577,6 +584,10 @@ object Parameters { parameters.get(PARAM_SF_ACCOUNT) } + def uniqueKeys: String = parameters.getOrElse(PARAM_UNIQUE_KEYS, "") + + def insertedColumns: String = parameters.getOrElse(PARAM_INSERTED_COLUMNS, "") + /** * Snowflake SSL on/off - "on" by default */ @@ -625,7 +636,6 @@ object Parameters { def nonProxyHosts: Option[String] = { parameters.get(PARAM_NON_PROXY_HOSTS) } - /** * Mapping OAuth and authenticator values */ diff --git a/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala b/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala index 4e2b1114..f5f840f0 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/SnowflakeJDBCWrapper.scala @@ -518,6 +518,22 @@ private[snowflake] object DefaultJDBCWrapper extends JDBCWrapper { .execute(bindVariableEnabled)(connection) } + def mergeIntoTable(tempTable: String, + targetTable: String, + keys: Array[String], + insertedColumns: Array[String], + bindVariableEnabled: Boolean = true): Unit = { + val joinExpr = keys.map(key => s"$targetTable.$key=$tempTable.$key").mkString(" and ") + val updateExpr = insertedColumns.map(c => s"$targetTable.$c=$tempTable.$c").mkString(",") + val matchExpr = s"when matched then update set $updateExpr" + val insertedColumnStr = insertedColumns.mkString(",") + val insertExpr = insertedColumns.map(key => s"$tempTable.$key").mkString(",") + val notMatchExpr = s"when not matched then insert($insertedColumnStr)values($insertExpr)" + (ConstantString("merge into") + Identifier(targetTable) + "using" + + Identifier(tempTable) + "on" + joinExpr + matchExpr + notMatchExpr) + .execute(bindVariableEnabled)(connection) + } + def truncateTable(table: String, bindVariableEnabled: Boolean = true): Unit = (ConstantString("truncate") + table) diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala index 4e7757bc..ad639a39 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala @@ -418,18 +418,44 @@ private[io] object StageWriter { ) // Execute COPY INTO TABLE to load data - StageWriter.executeCopyIntoTable( - sqlContext, - conn, - schema, - saveMode, - params, - targetTable, - file, - tempStage, - format, - fileUploadResults) - + // update data by merging syntax + val keyStr: String = sqlContext.getConf("spark.sql.unique.keys", params.uniqueKeys) + val insertedColumnStr: String = sqlContext.getConf("spark.sql.inserted.columns", params.insertedColumns) + if(!tempTable.equals(targetTable) && !keyStr.isEmpty && !insertedColumnStr.isEmpty){ + conn.createTable(tempTable.name, schema, params, + overwrite = true, temporary = true) + StageWriter.executeCopyIntoTable( + sqlContext, + conn, + schema, + saveMode, + params, + tempTable, + file, + tempStage, + format, + fileUploadResults + ) + val keys = keyStr.split(',') + val insertedColumns = insertedColumnStr.split(',') + conn.mergeIntoTable(tempTable.name, + targetTable.name, + keys, + insertedColumns) + } else { + StageWriter.executeCopyIntoTable( + sqlContext, + conn, + schema, + saveMode, + params, + targetTable, + file, + tempStage, + format, + fileUploadResults + ) + } // post actions Utils.executePostActions( DefaultJDBCWrapper,