From b04dbcd347d2132f37a651ad074686e94f824a9e Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 9 Jan 2025 11:30:03 +0000 Subject: [PATCH] materialize-databricks: check for duplicates before and after query --- materialize-databricks/driver.go | 91 ++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/materialize-databricks/driver.go b/materialize-databricks/driver.go index 811418149..7239f0949 100644 --- a/materialize-databricks/driver.go +++ b/materialize-databricks/driver.go @@ -261,11 +261,25 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage) } } else { switch v := converted[0].(type) { - case int: - if v < bmx.Min.(int) { + case int64: + if v < bmx.Min.(int64) { bmx.Min = v } - if v > bmx.Max.(int) { + if v > bmx.Max.(int64) { + bmx.Max = v + } + case uint64: + if v < bmx.Min.(uint64) { + bmx.Min = v + } + if v > bmx.Max.(uint64) { + bmx.Max = v + } + case float64: + if v < bmx.Min.(float64) { + bmx.Min = v + } + if v > bmx.Max.(float64) { bmx.Max = v } case string: @@ -324,7 +338,6 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage) } defer rows.Close() d.be.FinishedEvaluatingLoads() - log.WithField("loadQuery", unionQuery) var loadedCounts = make(map[string]int) for rows.Next() { @@ -474,6 +487,41 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error) }, nil } +func (d *transactor) checkForDuplicates(ctx context.Context, db *stdsql.DB, stateKey string, item *checkpointItem, logMessage string) error { + var binding = d.bindingForStateKey(stateKey) + var keyStrings []string + for _, k := range binding.target.Keys { + keyStrings = append(keyStrings, k.Identifier) + } + var duplicatesQuery = fmt.Sprintf(`select %s from %s group by %s having count(*) > 1`, strings.Join(keyStrings, ","), binding.target.Identifier, strings.Join(keyStrings, ",")) + rows, err := db.QueryContext(ctx, duplicatesQuery) + if err != nil { + return fmt.Errorf("finding %q duplicates: %w", duplicatesQuery, err) + } + defer rows.Close() + var duplicates [][]any + for rows.Next() { + var row = make([]any, len(binding.target.Keys)) + + var rowReferences = make([]any, len(row)) + for i := 0; i < len(row); i++ { + rowReferences[i] = &row[i] + } + + if err := rows.Scan(rowReferences...); err != nil { + return fmt.Errorf("scanning duplicates: %w", err) + } + + duplicates = append(duplicates, row) + } + + if len(duplicates) > 0 { + log.WithFields(log.Fields{"query": item.Queries, "fullPaths": item.ToDelete, "duplicates": duplicates}).Info(logMessage) + } + + return nil +} + // Acknowledge merges data from temporary table to main table // TODO: run these queries concurrently for improved performance func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) { @@ -495,6 +543,10 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error continue } + if err := d.checkForDuplicates(ctx, db, stateKey, item, "before running query"); err != nil { + return nil, err + } + var queries = item.Queries if item.Query != "" { if len(queries) != 0 { @@ -517,35 +569,8 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error } - var binding = d.bindingForStateKey(stateKey) - var keyStrings []string - for _, k := range binding.target.Keys { - keyStrings = append(keyStrings, k.Identifier) - } - var duplicatesQuery = fmt.Sprintf(`select %s from %s group by %s having count(*) > 1`, strings.Join(keyStrings, ","), binding.target.Identifier, strings.Join(keyStrings, ",")) - rows, err := db.QueryContext(ctx, duplicatesQuery) - if err != nil { - return nil, fmt.Errorf("finding %q duplicates: %w", duplicatesQuery, err) - } - defer rows.Close() - var duplicates [][]any - for rows.Next() { - var row = make([]any, len(binding.target.Keys)) - - var rowReferences = make([]any, len(row)) - for i := 0; i < len(row); i++ { - rowReferences[i] = &row[i] - } - - if err := rows.Scan(rowReferences...); err != nil { - return nil, fmt.Errorf("scanning duplicates: %w", err) - } - - duplicates = append(duplicates, row) - } - - if len(duplicates) > 0 { - log.WithFields(log.Fields{"query": item.Queries, "fullPaths": item.ToDelete, "duplicates": duplicates}).Info("acknowledge") + if err := d.checkForDuplicates(ctx, db, stateKey, item, "after running query"); err != nil { + return nil, err } d.be.FinishedResourceCommit(path)