Skip to content

Commit

Permalink
source-*-batch: Add a watchdog timeout on query results
Browse files Browse the repository at this point in the history
The timeout is 30 minutes for the first result row and 5 minutes
for subsequent result rows, which is absurdly generous any time
things are working correctly. Pretty much the only times we ought
to see these fire are:

 - When the database connection has silently dropped on us (which
   is a thing that happens sometimes, especially when using SSH
   tunneling)
 - When the polling query is performing a full-table sort on some
   massive dataset. And generally speaking if that hasn't finished
   after 30 minutes it's not likely that we'll actually end up with
   a successful capture and a pleasant user experience anyway.
  • Loading branch information
willdonnelly committed Jan 13, 2025
1 parent f064caf commit 7f596b0
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 0 deletions.
24 changes: 24 additions & 0 deletions source-bigquery-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ const (
// on overall throughput, and isn't really needed anyway. So instead we emit one for
// every N rows, plus one when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -615,6 +630,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

var q = c.DB.Query(query)
var params []bigquery.QueryParameter
for idx, val := range cursorValues {
Expand Down Expand Up @@ -643,6 +666,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
} else if err != nil {
return fmt.Errorf("error reading result row: %w", err)
}
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

if shape == nil {
// Construct a Shape corresponding to the columns of these result rows
Expand Down
25 changes: 25 additions & 0 deletions source-mysql-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ const (
// on overall throughput, and isn't really needed anyway. So instead we emit one for
// every N rows, plus one when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
Expand Down Expand Up @@ -672,6 +687,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

// There is no helper function for a streaming select query _with arguments_,
// so we have to drop down a level and prepare the statement ourselves here.
stmt, err := c.DB.Prepare(query)
Expand All @@ -690,6 +713,8 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
var serializedDocument []byte

if err := stmt.ExecuteSelectStreaming(&result, func(row []mysql.FieldValue) error {
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

if shape == nil {
// Construct a Shape corresponding to the columns of these result rows
var fieldNames = []string{"_meta"}
Expand Down
24 changes: 24 additions & 0 deletions source-postgres-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ const (
// throughput, and isn't really needed anyway. So instead we emit one for every
// N rows, plus another when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
Expand Down Expand Up @@ -643,6 +658,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

rows, err := c.DB.QueryContext(ctx, query, cursorValues...)
if err != nil {
return fmt.Errorf("error executing query: %w", err)
Expand Down Expand Up @@ -689,6 +712,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
if err := rows.Scan(columnPointers...); err != nil {
return fmt.Errorf("error scanning result row: %w", err)
}
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

for idx, val := range columnValues {
var translatedVal, err = c.TranslateValue(val, columnTypes[idx].DatabaseTypeName())
Expand Down
24 changes: 24 additions & 0 deletions source-redshift-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ const (
// on overall throughput, and isn't really needed anyway. So instead we emit one for
// every N rows, plus one when the query results are fully processed.
documentsPerCheckpoint = 1000

// We have a watchdog timeout which fires if we're sitting there waiting for query
// results but haven't received anything for a while. This constant specifies the
// duration of that timeout while waiting for for the first result row.
//
// It is useful to have two different timeouts for the first result row versus any
// subsequent rows, because sometimes when using a cursor the queries can have some
// nontrivial startup cost and we don't want to make things any more failure-prone
// than we have to.
pollingWatchdogFirstRowTimeout = 30 * time.Minute

// The duration of the no-data watchdog for subsequent rows after the first one.
// This timeout can be less generous, because after the first row is received we
// ought to be getting a consistent stream of results until the query completes.
pollingWatchdogTimeout = 5 * time.Minute
)

var featureFlagDefaults = map[string]bool{
Expand Down Expand Up @@ -816,6 +831,14 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
}).Info("executing query")
var pollTime = time.Now().UTC()

// Set up a watchdog timeout which will terminate the capture task if no data is
// received after a long period of time. The deferred stop ensures that the timeout
// will always be cancelled for good when the polling operation finishes.
var watchdog = time.AfterFunc(pollingWatchdogFirstRowTimeout, func() {
log.WithField("name", res.Name).Fatal("polling timed out")
})
defer watchdog.Stop()

rows, err := c.DB.QueryContext(ctx, query, cursorValues...)
if err != nil {
return fmt.Errorf("error executing query: %w", err)
Expand Down Expand Up @@ -862,6 +885,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
if err := rows.Scan(columnPointers...); err != nil {
return fmt.Errorf("error scanning result row: %w", err)
}
watchdog.Reset(pollingWatchdogTimeout) // Reset the no-data watchdog timeout after each row received

for idx, val := range columnValues {
var translatedVal, err = c.TranslateValue(val, columnTypes[idx].DatabaseTypeName())
Expand Down

0 comments on commit 7f596b0

Please sign in to comment.