Skip to content

Commit

Permalink
materialize-databricks: debugging logs for duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Jan 9, 2025
1 parent a7fa34c commit c33f43c
Showing 1 changed file with 80 additions and 2 deletions.
82 changes: 80 additions & 2 deletions materialize-databricks/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,15 @@ func (t *transactor) addBinding(target sql.Table) error {
return nil
}

type minMax struct {
Min any
Max any
}

func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage) error) error {
var ctx = it.Context()

var bindingMinMaxKeys = make(map[string]minMax)
for it.Next() {
var b = d.bindings[it.Binding]

Expand All @@ -246,6 +252,31 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
return fmt.Errorf("converting Load key: %w", err)
} else if err := b.loadFile.encodeRow(converted); err != nil {
return fmt.Errorf("encoding row for load: %w", err)
} else {
bindingName := b.target.Identifier
if bmx, ok := bindingMinMaxKeys[bindingName]; !ok {
bindingMinMaxKeys[bindingName] = minMax{
Min: converted[0],
Max: converted[0],
}
} else {
switch v := converted[0].(type) {
case int:
if v < bmx.Min.(int) {
bmx.Min = v
}
if v > bmx.Max.(int) {
bmx.Max = v
}
case string:
if v < bmx.Min.(string) {
bmx.Min = v
}
if v > bmx.Max.(string) {
bmx.Max = v
}
}
}
}
}

Expand All @@ -270,7 +301,7 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
toDelete = append(toDelete, fullPaths...)
}
}
defer d.deleteFiles(ctx, toDelete)
//defer d.deleteFiles(ctx, toDelete)

if it.Err() != nil {
return it.Err()
Expand All @@ -293,7 +324,9 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
}
defer rows.Close()
d.be.FinishedEvaluatingLoads()
log.WithField("loadQuery", unionQuery)

var loadedCounts = make(map[string]int)
for rows.Next() {
var binding int
var document string
Expand All @@ -305,7 +338,11 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
return err
}
}

bindingName := d.bindings[binding].target.Identifier
loadedCounts[bindingName] = loadedCounts[bindingName] + 1
}
log.WithFields(log.Fields{"loadQuery": unionQuery, "loaded": loadedCounts, "minMaxKeys": bindingMinMaxKeys, "fullPaths": toDelete}).Info("load")

if err = rows.Err(); err != nil {
return fmt.Errorf("querying Loads: %w", err)
Expand Down Expand Up @@ -477,11 +514,43 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
}
return nil, fmt.Errorf("query %q failed: %w", query, err)
}

}

var binding = d.bindingForStateKey(stateKey)
var keyStrings []string
for _, k := range binding.target.Keys {
keyStrings = append(keyStrings, k.Identifier)
}
var duplicatesQuery = fmt.Sprintf(`select %s from %s group by %s having count(*) > 1`, strings.Join(keyStrings, ","), binding.target.Identifier, strings.Join(keyStrings, ","))
rows, err := db.QueryContext(ctx, duplicatesQuery)
if err != nil {
return nil, fmt.Errorf("finding %q duplicates: %w", duplicatesQuery, err)
}
defer rows.Close()
var duplicates [][]any
for rows.Next() {
var row = make([]any, len(binding.target.Keys))

var rowReferences = make([]any, len(row))
for i := 0; i < len(row); i++ {
rowReferences[i] = &row[i]
}

if err := rows.Scan(rowReferences...); err != nil {
return nil, fmt.Errorf("scanning duplicates: %w", err)
}

duplicates = append(duplicates, row)
}

if len(duplicates) > 0 {
log.WithFields(log.Fields{"query": item.Queries, "fullPaths": item.ToDelete, "duplicates": duplicates}).Info("acknowledge")
}
d.be.FinishedResourceCommit(path)

// Cleanup files.
d.deleteFiles(ctx, item.ToDelete)
//d.deleteFiles(ctx, item.ToDelete)
}

d.cpRecovery = false
Expand All @@ -507,6 +576,15 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
return &pf.ConnectorState{UpdatedJson: json.RawMessage(checkpointJSON), MergePatch: true}, nil
}

func (d *transactor) bindingForStateKey(stateKey string) *binding {
for _, b := range d.bindings {
if b.target.StateKey == stateKey {
return b
}
}
return nil
}

func (d *transactor) pathForStateKey(stateKey string) []string {
for _, b := range d.bindings {
if b.target.StateKey == stateKey {
Expand Down

0 comments on commit c33f43c

Please sign in to comment.