Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New live #3002

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions flytekit/_rich_logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import asyncio
import logging
import os

import click
from rich.console import Console
from rich.console import Group
from rich.live import Live
from rich.logging import RichHandler
from rich.panel import Panel
from rich.progress import Progress, BarColumn, TextColumn

import flytekit


class AsyncRichLoggerWithProgress:

def __init__(self):
# Log output stream and console
self.console = Console()
try:
width = os.get_terminal_size().columns
except Exception as e:
self.console.log(f"Failed to get terminal size: {e}")
width = 80

self.handler = RichHandler(
tracebacks_suppress=[click, flytekit],
rich_tracebacks=True,
omit_repeated_times=False,
show_path=False,
log_time_format="%H:%M:%S.%f",
console=Console(width=width),
)

# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[
RichHandler(rich_tracebacks=True, markup=True, console=self.console)
],
)
self.logger = logging.getLogger("rich")

# Progress bar setup
self.progress = Progress(
TextColumn("[bold cyan]{task.description}"),
BarColumn(bar_width=None),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
)
self.progress_tasks = {}

def add_progress_bar(self, task_name, total=100, **kwargs):
"""Add a progress bar."""
task_id = self.progress.add_task(task_name, total=total, **kwargs)
self.progress_tasks[task_name] = task_id

async def advance_progress(self, task_name, step=1):
"""Advance a progress bar."""
if task_name in self.progress_tasks:
self.progress.advance(self.progress_tasks[task_name], step)

async def log(self, message):
"""Log a message."""
self.logger.info(message)

async def render_live(self):
"""Render the live display with progress and logs."""
with Live(refresh_per_second=4, console=self.console) as live:
while True:
# Progress Panel
progress_panel = Panel(self.progress, title="Progress Panel", border_style="bold green")

# Group panels
group = Group(progress_panel)
live.update(group)

await asyncio.sleep(0.2)

async def __aenter__(self):
"""Enter the context manager."""
# Set the current instance in the context variable
self.render_task = asyncio.create_task(self.render_live())
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager."""
# Mark all tasks as complete and clear tasks
self.progress_tasks.clear()
self.render_task.cancel()


import asyncio


class SyncWrapper:
def __init__(self, async_context_manager):
self.async_context_manager = async_context_manager
self._entered_context = None

def __enter__(self):
# Run the async __aenter__ in a new event loop
self._entered_context = asyncio.run(self.async_context_manager.__aenter__())
return self._entered_context

def __exit__(self, exc_type, exc_val, exc_tb):
# Run the async __aexit__ in the event loop
asyncio.run(self.async_context_manager.__aexit__(exc_type, exc_val, exc_tb))


logger = AsyncRichLoggerWithProgress()


# Helper function to get the current progress manager from context
def get_current_rich_handler() -> AsyncRichLoggerWithProgress:
return logger


def get_current_sync_rich_handler() -> SyncWrapper:
return SyncWrapper(get_current_rich_handler())
80 changes: 36 additions & 44 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import yaml
from click import Context
from mashumaro.codecs.json import JSONEncoder
from rich.progress import Progress, TextColumn, TimeElapsedColumn
from typing_extensions import get_origin

from flytekit import Annotations, FlyteContext, FlyteContextManager, Labels, LaunchPlan, Literal, WorkflowExecutionPhase
Expand Down Expand Up @@ -53,7 +52,7 @@
labels_callback,
)
from flytekit.interaction.string_literals import literal_string_repr
from flytekit.loggers import logger
from flytekit.loggers import logger, add_progress_bar, advance_progress_bar
from flytekit.models import security
from flytekit.models.common import RawOutputDataConfig
from flytekit.models.interface import Parameter, Variable
Expand Down Expand Up @@ -536,34 +535,33 @@ def run_remote(
msg = "Running execution on remote."
if run_level_params.wait_execution:
msg += " Waiting to complete..."
p = Progress(TimeElapsedColumn(), TextColumn(msg), transient=True)
t = p.add_task("exec")
with p:
p.start_task(t)
execution = remote.execute(
entity,
inputs=inputs,
project=project,
domain=domain,
execution_name=run_level_params.name,
options=options_from_run_params(run_level_params),
type_hints=type_hints,
overwrite_cache=run_level_params.overwrite_cache,
envs=run_level_params.envvars,
tags=run_level_params.tags,
cluster_pool=run_level_params.cluster_pool,
execution_cluster_label=run_level_params.execution_cluster_label,
)
s = (
click.style("\n[✔] ", fg="green")
+ "Go to "
+ click.style(execution.execution_url, fg="cyan")
+ " to see execution in the console."
)
click.echo(s)
# p = Progress(TimeElapsedColumn(), TextColumn(msg), transient=True)
add_progress_bar("Remote Exec")
execution = remote.execute(
entity,
inputs=inputs,
project=project,
domain=domain,
execution_name=run_level_params.name,
options=options_from_run_params(run_level_params),
type_hints=type_hints,
overwrite_cache=run_level_params.overwrite_cache,
envs=run_level_params.envvars,
tags=run_level_params.tags,
cluster_pool=run_level_params.cluster_pool,
execution_cluster_label=run_level_params.execution_cluster_label,
)
s = (
click.style("\n[✔] ", fg="green")
+ "Go to "
+ click.style(execution.execution_url, fg="cyan")
+ " to see execution in the console."
)
click.echo(s)
advance_progress_bar("Remote Exec")

if run_level_params.wait_execution:
execution = remote.wait(execution, poll_interval=run_level_params.poll_interval)
if run_level_params.wait_execution:
execution = remote.wait(execution, poll_interval=run_level_params.poll_interval)

if run_level_params.wait_execution:
if execution.closure.phase != WorkflowExecutionPhase.SUCCEEDED:
Expand Down Expand Up @@ -904,21 +902,15 @@ def list_commands(self, ctx):

run_level_params: RunLevelParams = ctx.obj
r = run_level_params.remote_instance()
progress = Progress(transient=True)
task = progress.add_task(
f"[cyan]Gathering [{run_level_params.limit}] remote LaunchPlans...",
total=None,
)
with progress:
progress.start_task(task)
try:
self._entities = self._get_entities(
r, run_level_params.project, run_level_params.domain, run_level_params.limit
)
return self._entities
except FlyteSystemException as e:
pretty_print_exception(e)
return []
add_progress_bar("Remote Entities", total=None)
try:
self._entities = self._get_entities(
r, run_level_params.project, run_level_params.domain, run_level_params.limit
)
return self._entities
except FlyteSystemException as e:
pretty_print_exception(e)
return []

def get_command(self, ctx, name):
if self._command_name in [self.LAUNCHPLAN_COMMAND, self.WORKFLOW_COMMAND]:
Expand Down
12 changes: 7 additions & 5 deletions flytekit/clis/sdk_in_container/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,13 @@ def invoke(self, ctx: click.Context) -> typing.Any:
verbosity = ctx.params["verbose"]
log_level = get_level_from_cli_verbosity(verbosity)
logger.setLevel(log_level)
try:
return super().invoke(ctx)
except Exception as e:
pretty_print_exception(e, verbosity)
exit(1)
from flytekit import _rich_logging
with _rich_logging.get_current_sync_rich_handler():
try:
return super().invoke(ctx)
except Exception as e:
pretty_print_exception(e, verbosity)
exit(1)


def make_click_option_field(o: click.Option) -> Field:
Expand Down
70 changes: 41 additions & 29 deletions flytekit/loggers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import os
import typing
Expand Down Expand Up @@ -27,11 +28,13 @@
# global Python root logger is set to).
logger.propagate = False

rich_enabled = False


def set_flytekit_log_properties(
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
):
"""
flytekit logger, refers to the framework logger. It is possible to selectively tune the logging for flytekit.
Expand All @@ -54,9 +57,9 @@ def set_flytekit_log_properties(


def set_user_logger_properties(
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
):
"""
user_space logger, refers to the user's logger. It is possible to selectively tune the logging for the user.
Expand All @@ -75,9 +78,9 @@ def set_user_logger_properties(


def set_developer_properties(
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
):
"""
developer logger is only used for debugging. It is possible to selectively tune the logging for the developer.
Expand Down Expand Up @@ -141,32 +144,41 @@ def is_rich_logging_enabled() -> bool:


def upgrade_to_rich_logging(log_level: typing.Optional[int] = logging.WARNING):
import click
from rich.console import Console
from rich.logging import RichHandler

import flytekit

try:
width = os.get_terminal_size().columns
except Exception as e:
logger.debug(f"Failed to get terminal size: {e}")
width = 80

handler = RichHandler(
tracebacks_suppress=[click, flytekit],
rich_tracebacks=True,
omit_repeated_times=False,
show_path=False,
log_time_format="%H:%M:%S.%f",
console=Console(width=width),
)
from flytekit import _rich_logging
rich_handler = _rich_logging.get_current_rich_handler()

handler = rich_handler.handler
formatter = logging.Formatter(fmt="%(filename)s:%(lineno)d - %(message)s")
handler.setFormatter(formatter)
set_flytekit_log_properties(handler, None, _get_env_logging_level(default_level=log_level))
set_user_logger_properties(handler, None, logging.INFO)
set_developer_properties(handler, None, _get_dev_env_logging_level())
global rich_enabled
rich_enabled = True


def add_progress_bar(task: str, total: int = 1, **kwargs):
"""
Creates a new progress bar with name "task"
"""
global rich_enabled
if not rich_enabled:
return 0
from flytekit import _rich_logging
rich_handler = _rich_logging.get_current_rich_handler()
rich_handler.add_progress_bar(task, total=total, **kwargs)


def advance_progress_bar(task: str, advance: float = 1):
"""
Advances a progress bar with name "task"
"""
global rich_enabled
if not rich_enabled:
return 0
from flytekit import _rich_logging
rich_handler = _rich_logging.get_current_rich_handler()
rich_handler.advance_progress(task, advance)


def get_level_from_cli_verbosity(verbosity: int) -> int:
Expand Down
Loading