Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-*-batch: Timeouts and partial-progress logging #2267

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion source-bigquery-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ const (
// on overall throughput, and isn't really needed anyway. So instead we emit one for
// every N rows, plus one when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -615,6 +630,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

var q = c.DB.Query(query)
var params []bigquery.QueryParameter
for idx, val := range cursorValues {
Expand Down Expand Up @@ -643,6 +666,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
} else if err != nil {
return fmt.Errorf("error reading result row: %w", err)
}
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

if shape == nil {
// Construct a Shape corresponding to the columns of these result rows
Expand Down Expand Up @@ -701,12 +725,19 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
return err
}
}
if count%100000 == 1 {
log.WithFields(log.Fields{
"name": res.Name,
"count": count,
}).Info("processing query results")
}
}

log.WithFields(log.Fields{
"name": res.Name,
"query": query,
"count": count,
}).Info("query complete")
}).Info("polling complete")
state.LastPolled = pollTime
if err := c.streamStateCheckpoint(stateKey, state); err != nil {
return err
Expand Down
35 changes: 33 additions & 2 deletions source-mysql-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ const (
// on overall throughput, and isn't really needed anyway. So instead we emit one for
// every N rows, plus one when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
Expand Down Expand Up @@ -672,6 +687,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

// There is no helper function for a streaming select query _with arguments_,
// so we have to drop down a level and prepare the statement ourselves here.
stmt, err := c.DB.Prepare(query)
Expand All @@ -690,6 +713,8 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
var serializedDocument []byte

if err := stmt.ExecuteSelectStreaming(&result, func(row []mysql.FieldValue) error {
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

if shape == nil {
// Construct a Shape corresponding to the columns of these result rows
var fieldNames = []string{"_meta"}
Expand Down Expand Up @@ -741,22 +766,28 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
cursorValues[i] = rowValues[j]
}
state.CursorValues = cursorValues

count++
if count%documentsPerCheckpoint == 0 {
if err := c.streamStateCheckpoint(stateKey, state); err != nil {
return err
}
}
if count%100000 == 1 {
log.WithFields(log.Fields{
"name": res.Name,
"count": count,
}).Info("processing query results")
}
return nil
}, nil, args...); err != nil {
return fmt.Errorf("error executing backfill query: %w", err)
}

log.WithFields(log.Fields{
"name": res.Name,
"query": query,
"count": count,
}).Info("query complete")
}).Info("polling complete")
state.LastPolled = pollTime
if err := c.streamStateCheckpoint(stateKey, state); err != nil {
return err
Expand Down
10 changes: 8 additions & 2 deletions source-mysql-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strings"
"time"

cerrors "github.com/estuary/connectors/go/connector-errors"
networkTunnel "github.com/estuary/connectors/go/network-tunnel"
Expand Down Expand Up @@ -98,19 +99,24 @@ func connectMySQL(ctx context.Context, cfg *Config) (*client.Conn, error) {
c.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
return nil
}
var withTimeouts = func(c *client.Conn) error {
c.ReadTimeout = 60 * time.Second
c.WriteTimeout = 60 * time.Second
return nil
}
// The following if-else chain looks somewhat complicated but it's really very simple.
// * We'd prefer to use TLS, so we first try to connect with TLS, and then if that fails
// we try again without.
// * If either error is an incorrect username/password then we just report that.
// * Otherwise we report both errors because it's better to be clear what failed and how.
// * Except if the non-TLS connection specifically failed because TLS is required then
// we don't need to mention that and just return the with-TLS error.
if connWithTLS, errWithTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTLS); errWithTLS == nil {
if connWithTLS, errWithTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTimeouts, withTLS); errWithTLS == nil {
log.WithField("addr", cfg.Address).Info("connected with TLS")
conn = connWithTLS
} else if errors.As(errWithTLS, &mysqlErr) && mysqlErr.Code == mysql.ER_ACCESS_DENIED_ERROR {
return nil, cerrors.NewUserError(mysqlErr, "incorrect username or password")
} else if connWithoutTLS, errWithoutTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName); errWithoutTLS == nil {
} else if connWithoutTLS, errWithoutTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTimeouts); errWithoutTLS == nil {
log.WithField("addr", cfg.Address).Info("connected without TLS")
conn = connWithoutTLS
} else if errors.As(errWithoutTLS, &mysqlErr) && mysqlErr.Code == mysql.ER_ACCESS_DENIED_ERROR {
Expand Down
33 changes: 32 additions & 1 deletion source-postgres-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ const (
// throughput, and isn't really needed anyway. So instead we emit one for every
// N rows, plus another when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
Expand Down Expand Up @@ -643,6 +658,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

rows, err := c.DB.QueryContext(ctx, query, cursorValues...)
if err != nil {
return fmt.Errorf("error executing query: %w", err)
Expand Down Expand Up @@ -689,6 +712,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
if err := rows.Scan(columnPointers...); err != nil {
return fmt.Errorf("error scanning result row: %w", err)
}
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

for idx, val := range columnValues {
var translatedVal, err = c.TranslateValue(val, columnTypes[idx].DatabaseTypeName())
Expand Down Expand Up @@ -725,12 +749,19 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
return err
}
}
if count%100000 == 1 {
log.WithFields(log.Fields{
"name": res.Name,
"count": count,
}).Info("processing query results")
}
}

log.WithFields(log.Fields{
"name": res.Name,
"query": query,
"count": count,
}).Info("query complete")
}).Info("polling complete")
if err := rows.Err(); err != nil {
return fmt.Errorf("error processing results iterator: %w", err)
}
Expand Down
33 changes: 32 additions & 1 deletion source-redshift-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ const (
// on overall throughput, and isn't really needed anyway. So instead we emit one for
// every N rows, plus one when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

var featureFlagDefaults = map[string]bool{
Expand Down Expand Up @@ -816,6 +831,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

rows, err := c.DB.QueryContext(ctx, query, cursorValues...)
if err != nil {
return fmt.Errorf("error executing query: %w", err)
Expand Down Expand Up @@ -862,6 +885,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
if err := rows.Scan(columnPointers...); err != nil {
return fmt.Errorf("error scanning result row: %w", err)
}
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

for idx, val := range columnValues {
var translatedVal, err = c.TranslateValue(val, columnTypes[idx].DatabaseTypeName())
Expand Down Expand Up @@ -898,12 +922,19 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
return err
}
}
if count%100000 == 1 {
log.WithFields(log.Fields{
"name": res.Name,
"count": count,
}).Info("processing query results")
}
}

log.WithFields(log.Fields{
"name": res.Name,
"query": query,
"count": count,
}).Info("query complete")
}).Info("polling complete")
if err := rows.Err(); err != nil {
return fmt.Errorf("error processing results iterator: %w", err)
}
Expand Down
Loading