Skip to content

Commit

Permalink
Run logs python (#1271)
Browse files Browse the repository at this point in the history
* Python client support for stdout logging

* Add minimal_fasttrack.py example, with stdout logging to server
  • Loading branch information
suprjinx authored Jun 17, 2024
1 parent 80be919 commit 560fbf7
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 5 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ test-python-integration-aim: ## run the Aim python integration tests.
@echo ">>> Running Aim python integration tests."
@go run tests/integration/python/main.go -targets aim

.PHONY: test-python-integration-client
test-python-integration-client: ## run the FML Client python integration tests.
@echo ">>> Running FasttrackmlClient python integration tests."
@go run tests/integration/python/main.go -targets fml_client

#
# Container test targets.
#
Expand Down
69 changes: 69 additions & 0 deletions python/client_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import time
from random import randint, random

from fasttrackml.entities.metric import Metric

import fasttrackml
from fasttrackml import FasttrackmlClient


def print_metric_info(history):
for m in history:
print(f"name: {m.key}")
print(f"value: {m.value}")
print(f"step: {m.step}")
print(f"timestamp: {m.timestamp}")
print(f"context: {m.context}")
print("--")


def main():
fasttrackml.set_tracking_uri("http://localhost:5000")
# Creating an instance of the Fasttrackml client
client = FasttrackmlClient()

# Creating a new run
experiment_id = "0"
run = client.create_run(experiment_id)
run_id = run.info.run_id

# Set client to capture terminal output
client.init_output_logging(run_id)

# Logging terminal output
print("Here's some terminal output.")

# Logging a parameter
param_key = "param1"
param_value = randint(0, 100)
client.log_param(run_id, param_key, param_value)

metric_key = "foo"
# Logging metrics with context
client.log_metric(run_id, metric_key, random(), context={"context_key": "context_value1"})
client.log_metric(run_id, metric_key, random() + 1, context={"context_key": "context_value2"})
# Logging metrics without context
client.log_metric(run_id, metric_key, random() + 2)

# Logging a batch of metrics
timestamp = int(time.time() * 1000)
metrics = [
Metric("loss", 0.2, timestamp, 1, context={"context_key": "context_value3"}),
Metric("precision", 0.92, timestamp, 1, context={"context_key": "context_value4"}),
]
client.log_batch(run_id, metrics=metrics)

# Retrieving metric histories
metric_keys = [metric_key, "loss"]
metric_histories_df = client.get_metric_histories(run_ids=[run_id], metric_keys=metric_keys)

print(metric_histories_df)

print_metric_info(client.get_metric_history(run_id, metric_key))

# Closing the run
client.set_terminated(run_id)


if __name__ == "__main__":
main()
44 changes: 39 additions & 5 deletions python/client_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import socket
import subprocess
import time
import uuid
Expand All @@ -9,19 +10,37 @@

from fasttrackml import FasttrackmlClient

LOCALHOST = "127.0.0.1"

@pytest.fixture(scope="session", autouse=True)
def server():

@pytest.fixture(scope="session")
def fml_address():
# Launch the fml server
process = subprocess.Popen(["fml", "server"])
port = get_safe_port()
return f"{LOCALHOST}:{port}"


def get_safe_port():
"""Returns an ephemeral port that is very likely to be free to bind to."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((LOCALHOST, 0))
port = sock.getsockname()[1]
sock.close()
return port


@pytest.fixture(scope="session", autouse=True)
def server(fml_address):
process = subprocess.Popen(["fml", "server"], env={**os.environ, "FML_LISTEN_ADDRESS": f"{fml_address}"})
yield process
# Kill the fml server
time.sleep(3)
process.kill()


@pytest.fixture
def client():
return FasttrackmlClient("http://localhost:5000")
def client(fml_address):
return FasttrackmlClient(f"http://{fml_address}")


@pytest.fixture
Expand Down Expand Up @@ -81,3 +100,18 @@ def test_log_batch(client, server, run):
metric_keys = [metric_key2]
metric_histories_df = client.get_metric_histories(run_ids=[run.info.run_id], metric_keys=metric_keys)
assert metric_histories_df.value[0] == metric_key2_value


def test_log_output(client, server, run):
# test logging some output directly
for i in range(100):
log_data = str(uuid.uuid4()) + "\n" + str(uuid.uuid4())
assert client.log_output(run.info.run_id, log_data) == None


def test_init_output_logging(client, server, run):
# test logging some output implicitly
client.init_output_logging(run.info.run_id)
for i in range(100):
log_data = str(uuid.uuid4()) + "\n" + str(uuid.uuid4())
print(log_data)
7 changes: 7 additions & 0 deletions python/fasttrackml/_tracking_service/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,10 @@ def chunk_list(input_list, chunk_size):
"""Yield successive chunks from input_list."""
for i in range(0, len(input_list), chunk_size):
yield input_list[i : i + chunk_size]

def log_output(
self,
run_id: str,
data: str,
):
self.custom_store.log_output(run_id, data)
123 changes: 123 additions & 0 deletions python/fasttrackml/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import io
import sys
import threading
import time
from typing import Dict, List, Optional, Sequence

import pandas as pd
Expand All @@ -20,6 +24,92 @@ def __init__(self, tracking_uri: Optional[str] = None, registry_uri: Optional[st
self._tracking_client_mlflow = self._tracking_client
self._tracking_client = FasttrackmlTrackingServiceClient(final_tracking_uri)

def init_output_logging(self, run_id):
"""
Capture terminal output (stdout and stderr) of the current script and send it to
the Fasttrackml server. The output will be visible in the Run detail > Logs tab.
Args:
run_id: String ID of the run
.. code-block:: python
:caption: Example
from fasttrackml import FasttrackmlClient
# Create a run under the default experiment (whose id is '0').
# Since these are low-level CRUD operations, this method will create a run.
# To end the run, you'll have to explicitly end it.
client = FasttrackmlClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")
# start logging the terminal output
client.init_output_logging(run.info.run_id)
print("This will be logged in Fasttrackml")
client.set_terminated(run.info.run_id)
"""
print("Capturing output")
self.original_stdout, self.original_stderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = self.stdout_buffer, self.stderr_buffer = io.StringIO(), io.StringIO()
self.is_capture_logging = True
threading.Thread(target=self._capture_output, args=(run_id,)).start()

def _capture_output(self, run_id):
while self.is_capture_logging:
self._flush_buffers(run_id)
time.sleep(3)
self._flush_buffers(run_id)

def _flush_buffers(self, run_id):
output = self.stdout_buffer.getvalue()
self.stdout_buffer.truncate(0)
self.stdout_buffer.seek(0)
if output:
self.log_output(run_id, "STDOUT: " + output)
if self.original_stdout.closed == False:
self.original_stdout.write(output + "\n")
output = self.stderr_buffer.getvalue()
self.stderr_buffer.truncate(0)
self.stderr_buffer.seek(0)
if output:
self.log_output(run_id, "STDERR: " + output)
if self.original_stderr.closed == False:
self.original_stderr.write(output + "\n")

def set_terminated(self, run_id):
"""
Set the run as terminated, and, if needed, stop capturing output.
Args:
run_id: String ID of the run
.. code-block:: python
:caption: Example
from fasttrackml import FasttrackmlClient
# Create a run under the default experiment (whose id is '0').
# Since these are low-level CRUD operations, this method will create a run.
# To end the run, you'll have to explicitly end it.
client = FasttrackmlClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")
# Log some output
client.init_output_logging(run.info.run_id)
print("This is just some output we want to capture")
client.set_terminated(run.info.run_id)
"""
self.is_capture_logging = False
super().set_terminated(run_id)

def log_metric(
self,
run_id: str,
Expand Down Expand Up @@ -338,3 +428,36 @@ def get_metric_histories(
experiment_names,
context,
)

def log_output(
self,
run_id: str,
data: str,
) -> None:
"""
Log an explicit string for the provided run which will be viewable in the Run detail > Logs
tab.
Args:
run_id: String ID of the run
data: The data to log
.. code-block:: python
:caption: Example
from fasttrackml import FasttrackmlClient
# Create a run under the default experiment (whose id is '0').
# Since these are low-level CRUD operations, this method will create a run.
# To end the run, you'll have to explicitly end it.
client = FasttrackmlClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")
# Log some output
client.log_output(run.info.run_id, "This is just some output we want to capture")
client.set_terminated(run.info.run_id)
"""
self._tracking_client.log_output(run_id, data)
23 changes: 23 additions & 0 deletions python/fasttrackml/store/custom_rest_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,26 @@ def get_metric_histories(

with pa.ipc.open_stream(result.raw) as reader:
return reader.read_pandas().set_index(["run_id", "key", index])

def log_output(self, run_id, data):
request_body = {
"run_id": run_id,
"data": data,
}

result = http_request(
**{
"host_creds": self.get_host_creds(),
"endpoint": "/api/2.0/mlflow/runs/log-output",
"method": "POST",
"json": request_body,
}
)
if result.status_code != 200:
result = result.json()
if "error_code" in result:
raise MlflowException(
message=result["message"],
error_code=result["error_code"],
)
return result

0 comments on commit 560fbf7

Please sign in to comment.