Skip to content

Commit

Permalink
materialize-snowflake: check pipe existence when insertFiles fails
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 23, 2024
1 parent f83e726 commit 1a491aa
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ require (
github.com/microsoft/go-mssqldb v0.21.0
github.com/minio/highwayhash v1.0.2
github.com/mitchellh/mapstructure v1.5.0
github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb
github.com/pkg/sftp v1.13.6
github.com/rockset/rockset-go-client v0.15.4
github.com/segmentio/encoding v0.4.0
Expand Down Expand Up @@ -186,6 +185,7 @@ require (
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -635,7 +634,6 @@ github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
Expand All @@ -658,8 +656,6 @@ github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/marcboeker/go-duckdb v1.7.1 h1:m9/nKfP7cG9AptcQ95R1vfacRuhtrZE5pZF8BPUb/Iw=
github.com/marcboeker/go-duckdb v1.7.1/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is=
github.com/marcboeker/go-duckdb v1.8.0 h1:iOWv1wTL0JIMqpyns6hCf5XJJI4fY6lmJNk+itx5RRo=
github.com/marcboeker/go-duckdb v1.8.0/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
Expand Down Expand Up @@ -697,7 +693,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
Expand Down Expand Up @@ -897,8 +892,6 @@ go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg
go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk=
go.gazette.dev/core v0.89.1-0.20240418133910-d612fdcfd24a h1:pu/yxw6Lsv+HhkpRT/MldB2zfEKjKNBX5B/8on5rEoA=
go.gazette.dev/core v0.89.1-0.20240418133910-d612fdcfd24a/go.mod h1:TdjN2Q0A8NsL1C7hFjbl8BzGKobyFNPmhJFi54GvNyU=
go.mongodb.org/mongo-driver v1.12.1 h1:nLkghSU8fQNaK7oUmDhQFsnrtcoNy7Z6LVFKsEecqgE=
go.mongodb.org/mongo-driver v1.12.1/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ=
go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4l8=
go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand Down Expand Up @@ -1175,7 +1168,6 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
4 changes: 3 additions & 1 deletion materialize-snowflake/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ func (c *PipeClient) InsertReport(pipeName string) (*InsertReportResponse, error
}

var w strings.Builder
c.insertReportTpl.Execute(&w, urlTemplate)
if err := c.insertReportTpl.Execute(&w, urlTemplate); err != nil {
panic(err)
}
var url = w.String()

req, err := http.NewRequest("GET", url, nil)
Expand Down
17 changes: 16 additions & 1 deletion materialize-snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,24 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
if resp, err := d.pipeClient.InsertFiles(item.PipeName, fileRequests); err != nil {
var insertErr InsertFilesError
if errors.As(err, &insertErr) && insertErr.Code == "390404" && strings.Contains(insertErr.Message, "Pipe not found") {
log.WithField("pipeName", item.PipeName).Debug("pipe does not exist, skipping this checkpoint item")
// This error can happen if either the pipe was not found, or we are not authorized to use it
// It is possible to not be authorized to use the pipe even though we have created it. When creating the pipe
// we specify the role by which to create the pipe, but when using Snowpipe, we can't specify a role. Rather, the
// default role of the user must have access to use the pipe. So here we make an additional check to make sure
// the user has access to the pipe directly, or it has a default role which has access, otherwise
// we advise the user to set a default role.
var pipeNameParts = strings.Split(item.PipeName, ".")
var pipeNameLastPart = pipeNameParts[len(pipeNameParts)-1]

if exists, err := d.pipeExists(ctx, pipeNameLastPart); err != nil {
return nil, fmt.Errorf("checking pipe existence %q: %w", item.PipeName, err)
} else if exists {
return nil, fmt.Errorf("pipe exists %q but Snowpipe cannot access it. This is most likely because the user does not have access to pipes through its default role. Try setting the default role of the user:\nALTER USER %s SET DEFAULT_ROLE=%s", pipeNameLastPart, d.cfg.Credentials.User, d.cfg.Role)
}

// Pipe was not found for this checkpoint item. We take this to mean that this item has already
// been processed and the pipe has been cleaned up by us in a prior Acknowledge
log.WithField("pipeName", item.PipeName).Info("pipe does not exist, skipping this checkpoint item")
continue
}
return nil, fmt.Errorf("snowpipe insertFiles: %w", err)
Expand Down

0 comments on commit 1a491aa

Please sign in to comment.