Skip to content

Commit

Permalink
materialize-databricks: check for duplicates before and after query
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Jan 9, 2025
1 parent c33f43c commit b04dbcd
Showing 1 changed file with 58 additions and 33 deletions.
91 changes: 58 additions & 33 deletions materialize-databricks/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,25 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
}
} else {
switch v := converted[0].(type) {
case int:
if v < bmx.Min.(int) {
case int64:
if v < bmx.Min.(int64) {
bmx.Min = v
}
if v > bmx.Max.(int) {
if v > bmx.Max.(int64) {
bmx.Max = v
}
case uint64:
if v < bmx.Min.(uint64) {
bmx.Min = v
}
if v > bmx.Max.(uint64) {
bmx.Max = v
}
case float64:
if v < bmx.Min.(float64) {
bmx.Min = v
}
if v > bmx.Max.(float64) {
bmx.Max = v
}
case string:
Expand Down Expand Up @@ -324,7 +338,6 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
}
defer rows.Close()
d.be.FinishedEvaluatingLoads()
log.WithField("loadQuery", unionQuery)

var loadedCounts = make(map[string]int)
for rows.Next() {
Expand Down Expand Up @@ -474,6 +487,41 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}, nil
}

func (d *transactor) checkForDuplicates(ctx context.Context, db *stdsql.DB, stateKey string, item *checkpointItem, logMessage string) error {
var binding = d.bindingForStateKey(stateKey)
var keyStrings []string
for _, k := range binding.target.Keys {
keyStrings = append(keyStrings, k.Identifier)
}
var duplicatesQuery = fmt.Sprintf(`select %s from %s group by %s having count(*) > 1`, strings.Join(keyStrings, ","), binding.target.Identifier, strings.Join(keyStrings, ","))
rows, err := db.QueryContext(ctx, duplicatesQuery)
if err != nil {
return fmt.Errorf("finding %q duplicates: %w", duplicatesQuery, err)
}
defer rows.Close()
var duplicates [][]any
for rows.Next() {
var row = make([]any, len(binding.target.Keys))

var rowReferences = make([]any, len(row))
for i := 0; i < len(row); i++ {
rowReferences[i] = &row[i]
}

if err := rows.Scan(rowReferences...); err != nil {
return fmt.Errorf("scanning duplicates: %w", err)
}

duplicates = append(duplicates, row)
}

if len(duplicates) > 0 {
log.WithFields(log.Fields{"query": item.Queries, "fullPaths": item.ToDelete, "duplicates": duplicates}).Info(logMessage)
}

return nil
}

// Acknowledge merges data from temporary table to main table
// TODO: run these queries concurrently for improved performance
func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
Expand All @@ -495,6 +543,10 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
continue
}

if err := d.checkForDuplicates(ctx, db, stateKey, item, "before running query"); err != nil {
return nil, err
}

var queries = item.Queries
if item.Query != "" {
if len(queries) != 0 {
Expand All @@ -517,35 +569,8 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error

}

var binding = d.bindingForStateKey(stateKey)
var keyStrings []string
for _, k := range binding.target.Keys {
keyStrings = append(keyStrings, k.Identifier)
}
var duplicatesQuery = fmt.Sprintf(`select %s from %s group by %s having count(*) > 1`, strings.Join(keyStrings, ","), binding.target.Identifier, strings.Join(keyStrings, ","))
rows, err := db.QueryContext(ctx, duplicatesQuery)
if err != nil {
return nil, fmt.Errorf("finding %q duplicates: %w", duplicatesQuery, err)
}
defer rows.Close()
var duplicates [][]any
for rows.Next() {
var row = make([]any, len(binding.target.Keys))

var rowReferences = make([]any, len(row))
for i := 0; i < len(row); i++ {
rowReferences[i] = &row[i]
}

if err := rows.Scan(rowReferences...); err != nil {
return nil, fmt.Errorf("scanning duplicates: %w", err)
}

duplicates = append(duplicates, row)
}

if len(duplicates) > 0 {
log.WithFields(log.Fields{"query": item.Queries, "fullPaths": item.ToDelete, "duplicates": duplicates}).Info("acknowledge")
if err := d.checkForDuplicates(ctx, db, stateKey, item, "after running query"); err != nil {
return nil, err
}
d.be.FinishedResourceCommit(path)

Expand Down

0 comments on commit b04dbcd

Please sign in to comment.