From ec3334b78d13944f0322d59ffa1b250677fc744d Mon Sep 17 00:00:00 2001 From: Will Baker Date: Fri, 8 Nov 2024 16:36:32 -0500 Subject: [PATCH] shim-airbyte-cdk: redirect rogue outputs to stdout Some imported connectors have errant `print` calls or similar that output directly to stdout, which Flow will otherwise interpret as a captured document or checkpoint. This is really ugly, but to prevent it from happening we can redirect stdout to stderr after the task outputs have been initialized. That way the printed output is treated as logs instead of capture data. --- estuary-cdk/estuary_cdk/shim_airbyte_cdk.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py index 080c6572d..d902c2e9c 100644 --- a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py +++ b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py @@ -1,5 +1,7 @@ from dataclasses import dataclass +import io from logging import Logger +import sys from pydantic import Field from typing import Any, ClassVar, Annotated, Callable, Awaitable, List, Literal import asyncio @@ -294,7 +296,6 @@ async def _run( config: EndpointConfig, connector_state: ConnectorState, ) -> None: - airbyte_streams: list[ConfiguredAirbyteStream] = [ ConfiguredAirbyteStream( stream=AirbyteStream( @@ -336,6 +337,14 @@ async def _run( AirbyteStateMessage(**rs.state) for _, rs in index.values() if rs.state ] + # Redirect any rogue `print` or similar outputs that are sent to stdout + # in the airbyte code to stderr instead so that they are interpreted as + # logs instead of captured documents or checkpoints. It's ok to + # re-assign stdout like this here without keeping a reference to it + # because the `task` output has already captured a reference to the + # original stdout. + sys.stdout = sys.stderr + for message in self.delegate.read( task.log, config, airbyte_catalog, airbyte_states ):