Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor model class and runners to be more independent #494

Merged
merged 38 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8df6acb
get model class from config
deigen Jan 22, 2025
622461c
download checkpoints command
deigen Jan 22, 2025
1fc5c22
pass model to server class, change grpc flag name
deigen Jan 22, 2025
cb9f59c
download_checkpoints option in server
deigen Jan 22, 2025
9fe3cf5
rename ModelUploader to ModelBuilder
deigen Jan 23, 2025
134e026
create_model_instance function
deigen Jan 23, 2025
a29661e
include main for now to reduce diff
deigen Jan 23, 2025
c612812
refactor upload model main to reduce diff
deigen Jan 23, 2025
24c2595
fix arg
deigen Jan 23, 2025
f2db76f
fixes
deigen Jan 23, 2025
ddc384a
fix formatting
deigen Jan 23, 2025
2312951
make class_info section optional
deigen Jan 23, 2025
921ad10
edit unittest
deigen Jan 23, 2025
a05a8b3
edit tests to use model class
deigen Jan 23, 2025
e851890
init model directly for unittest
deigen Jan 24, 2025
86a07f5
update cmdline
deigen Jan 24, 2025
991ede3
Merge branch 'master' into model-class-refactor
deigen Jan 24, 2025
b1337d7
fix runner inheritence and simplify test
deigen Jan 24, 2025
7a4a8a9
update calls in grpc server
deigen Jan 24, 2025
ee5987e
update names in test
deigen Jan 24, 2025
8fd2217
fix test
deigen Jan 24, 2025
556ab26
fix test
deigen Jan 24, 2025
1123b63
formatting
deigen Jan 24, 2025
1e20682
fix name in test
deigen Jan 24, 2025
e317b91
fix use of modelclass in tests
deigen Jan 24, 2025
829a55a
move load_model into default construction
deigen Jan 24, 2025
a3738a4
Merge branch 'master' into model-class-refactor
deigen Jan 27, 2025
ed00d68
remove download_checkpoints arg
deigen Jan 27, 2025
ab52d07
set uuid in test setup
deigen Jan 27, 2025
12daf0e
write configs to tempfiles to keep tests independent
deigen Jan 27, 2025
7f34602
set compute cluster id in nodepool test
deigen Jan 27, 2025
27fb1ec
revert unrelated changes to test_compute_orchestration
deigen Jan 29, 2025
7cb44d5
add error message for updaing runner class
deigen Jan 29, 2025
5fe1e7a
put back load_model as abstract method
deigen Jan 29, 2025
2d9af83
remove init args for this pr
deigen Jan 29, 2025
cd45884
fix tests
deigen Jan 29, 2025
9974c4d
copy all of 1/ dir
deigen Jan 31, 2025
c0716ba
Merge branch 'master' into model-class-refactor
deigen Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions clarifai/cli/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ def model():
)
def upload(model_path, download_checkpoints, skip_dockerfile):
"""Upload a model to Clarifai."""
from clarifai.runners.models import model_upload

model_upload.main(model_path, download_checkpoints, skip_dockerfile)
from clarifai.runners.models.model_builder import upload_model
upload_model(model_path, download_checkpoints, skip_dockerfile)


@model.command()
Expand All @@ -50,9 +49,9 @@ def upload(model_path, download_checkpoints, skip_dockerfile):
def download_checkpoints(model_path, out_path):
"""Download checkpoints from external source to local model_path"""

from clarifai.runners.models.model_upload import ModelUploader
uploader = ModelUploader(model_path, download_validation_only=True)
uploader.download_checkpoints(out_path)
from clarifai.runners.models.model_builder import ModelBuilder
builder = ModelBuilder(model_path, download_validation_only=True)
builder.download_checkpoints(out_path)


@model.command()
Expand Down
4 changes: 2 additions & 2 deletions clarifai/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .models.base_typed_model import AnyAnyModel, TextInputModel, VisualInputModel
from .models.model_builder import ModelBuilder
from .models.model_runner import ModelRunner
from .models.model_upload import ModelUploader
from .utils.data_handler import InputDataHandler, OutputDataHandler

__all__ = [
"ModelRunner",
"ModelUploader",
"ModelBuilder",
"InputDataHandler",
"OutputDataHandler",
"AnyAnyModel",
Expand Down
4 changes: 2 additions & 2 deletions clarifai/runners/models/base_typed_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from google.protobuf import json_format

from ..utils.data_handler import InputDataHandler, OutputDataHandler
from .model_runner import ModelRunner
from .model_class import ModelClass


class AnyAnyModel(ModelRunner):
class AnyAnyModel(ModelClass):

def load_model(self):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import importlib
import inspect
import os
import re
import sys
Expand All @@ -13,6 +15,7 @@
from rich.markup import escape

from clarifai.client import BaseClient
from clarifai.runners.models.model_class import ModelClass
from clarifai.runners.utils.const import (
AVAILABLE_PYTHON_IMAGES, AVAILABLE_TORCH_IMAGES, CONCEPTS_REQUIRED_MODEL_TYPE,
DEFAULT_PYTHON_VERSION, PYTHON_BUILDER_IMAGE, PYTHON_RUNTIME_IMAGE, TORCH_BASE_IMAGE)
Expand All @@ -28,7 +31,7 @@ def _clear_line(n: int = 1) -> None:
print(LINE_UP, end=LINE_CLEAR, flush=True)


class ModelUploader:
class ModelBuilder:

def __init__(self, folder: str, validate_api_ids: bool = True, download_validation_only=False):
"""
Expand All @@ -52,6 +55,55 @@ def __init__(self, folder: str, validate_api_ids: bool = True, download_validati
self.inference_compute_info = self._get_inference_compute_info()
self.is_v3 = True # Do model build for v3

def create_model_instance(self, load_model=True):
"""
Create an instance of the model class, as specified in the config file.
"""
# look for default model.py file location
for loc in ["model.py", "1/model.py"]:
model_file = os.path.join(self.folder, loc)
if os.path.exists(model_file):
break
if not os.path.exists(model_file):
raise Exception("Model file not found.")

module_name = os.path.basename(model_file).replace(".py", "")

spec = importlib.util.spec_from_file_location(module_name, model_file)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)

# Find all classes in the model.py file that are subclasses of ModelClass
classes = [
cls for _, cls in inspect.getmembers(module, inspect.isclass)
if issubclass(cls, ModelClass) and cls.__module__ == module.__name__
]
# Ensure there is exactly one subclass of BaseRunner in the model.py file
if len(classes) != 1:
# check for old inheritence structure, ModelRunner used to be a ModelClass
runner_classes = [
cls for _, cls in inspect.getmembers(module, inspect.isclass)
if cls.__module__ == module.__name__ and any(c.__name__ == 'ModelRunner'
for c in cls.__bases__)
]
if runner_classes and len(runner_classes) == 1:
raise Exception(
f'Could not determine model class.'
f' Models should now inherit from {ModelClass.__module__}.ModelClass, not ModelRunner.'
f' Please update your model "{runner_classes[0].__name__}" to inherit from ModelClass.'
)
raise Exception(
"Could not determine model class. There should be exactly one model inheriting from ModelClass defined in the model.py"
)
model_class = classes[0]

# initialize the model
model = model_class()
if load_model:
model.load_model()
return model

def _validate_folder(self, folder):
if folder == ".":
folder = "" # will getcwd() next which ends with /
Expand Down Expand Up @@ -589,19 +641,19 @@ def monitor_model_build(self):
return False


def main(folder, download_checkpoints, skip_dockerfile):
uploader = ModelUploader(folder)
def upload_model(folder, download_checkpoints, skip_dockerfile):
builder = ModelBuilder(folder)
if download_checkpoints:
uploader.download_checkpoints()
builder.download_checkpoints()
if not skip_dockerfile:
uploader.create_dockerfile()
exists = uploader.check_model_exists()
builder.create_dockerfile()
exists = builder.check_model_exists()
if exists:
logger.info(
f"Model already exists at {uploader.model_url}, this upload will create a new version for it."
f"Model already exists at {builder.model_url}, this upload will create a new version for it."
)
else:
logger.info(f"New model will be created at {uploader.model_url} with it's first version.")
logger.info(f"New model will be created at {builder.model_url} with it's first version.")

input("Press Enter to continue...")
uploader.upload_model_version(download_checkpoints)
builder.upload_model_version(download_checkpoints)
74 changes: 17 additions & 57 deletions clarifai/runners/models/model_run_locally.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import hashlib
import importlib.util
import inspect
import os
import platform
import shutil
Expand All @@ -14,9 +12,8 @@

from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from clarifai_protocol import BaseRunner

from clarifai.runners.models.model_upload import ModelUploader
from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.utils.url_fetcher import ensure_urls_downloaded
from clarifai.utils.logging import logger

Expand All @@ -27,9 +24,9 @@ def __init__(self, model_path):
self.model_path = model_path
self.requirements_file = os.path.join(self.model_path, "requirements.txt")

# ModelUploader contains multiple useful methods to interact with the model
self.uploader = ModelUploader(self.model_path, download_validation_only=True)
self.config = self.uploader.config
# ModelBuilder contains multiple useful methods to interact with the model
self.builder = ModelBuilder(self.model_path, download_validation_only=True)
self.config = self.builder.config

def _requirements_hash(self):
"""Generate a hash of the requirements file."""
Expand Down Expand Up @@ -91,38 +88,10 @@ def install_requirements(self):
self.clean_up()
sys.exit(1)

def _get_model_runner(self):
"""Dynamically import the runner class from the model file."""

# import the runner class that to be implement by the user
runner_path = os.path.join(self.model_path, "1", "model.py")

# arbitrary name given to the module to be imported
module = "runner_module"

spec = importlib.util.spec_from_file_location(module, runner_path)
runner_module = importlib.util.module_from_spec(spec)
sys.modules[module] = runner_module
spec.loader.exec_module(runner_module)

# Find all classes in the model.py file that are subclasses of BaseRunner
classes = [
cls for _, cls in inspect.getmembers(runner_module, inspect.isclass)
if issubclass(cls, BaseRunner) and cls.__module__ == runner_module.__name__
]

# Ensure there is exactly one subclass of BaseRunner in the model.py file
if len(classes) != 1:
raise Exception("Expected exactly one subclass of BaseRunner, found: {}".format(
len(classes)))

MyRunner = classes[0]
return MyRunner

def _build_request(self):
"""Create a mock inference request for testing the model."""

model_version_proto = self.uploader.get_model_version_proto()
model_version_proto = self.builder.get_model_version_proto()
model_version_proto.id = "model_version"

return service_pb2.PostModelOutputsRequest(
Expand All @@ -142,8 +111,8 @@ def _build_stream_request(self):
for i in range(1):
yield request

def _run_model_inference(self, runner):
"""Perform inference using the runner."""
def _run_model_inference(self, model):
"""Perform inference using the model."""
request = self._build_request()
stream_request = self._build_stream_request()

Expand All @@ -152,7 +121,7 @@ def _run_model_inference(self, runner):
generate_response = None
stream_response = None
try:
predict_response = runner.predict(request)
predict_response = model.predict(request)
except NotImplementedError:
logger.info("Model does not implement predict() method.")
except Exception as e:
Expand All @@ -172,7 +141,7 @@ def _run_model_inference(self, runner):
logger.info(f"Model Prediction succeeded: {predict_response}")

try:
generate_response = runner.generate(request)
generate_response = model.generate(request)
except NotImplementedError:
logger.info("Model does not implement generate() method.")
except Exception as e:
Expand All @@ -194,7 +163,7 @@ def _run_model_inference(self, runner):
f"Model Prediction succeeded for generate and first response: {generate_first_res}")

try:
stream_response = runner.stream(stream_request)
stream_response = model.stream(stream_request)
except NotImplementedError:
logger.info("Model does not implement stream() method.")
except Exception as e:
Expand All @@ -217,17 +186,10 @@ def _run_model_inference(self, runner):

def _run_test(self):
"""Test the model locally by making a prediction."""
# construct MyRunner which will call load_model()
MyRunner = self._get_model_runner()
runner = MyRunner(
runner_id="n/a",
nodepool_id="n/a",
compute_cluster_id="n/a",
user_id="n/a",
health_check_port=None,
)
# Create the model
model = self.builder.create_model_instance()
# send an inference.
self._run_model_inference(runner)
self._run_model_inference(model)

def test_model(self):
"""Test the model by running it locally in the virtual environment."""
Expand Down Expand Up @@ -275,7 +237,7 @@ def run_model_server(self, port=8080):

command = [
self.python_executable, "-m", "clarifai.runners.server", "--model_path", self.model_path,
"--start_dev_server", "--port",
"--grpc", "--port",
str(port)
]
try:
Expand Down Expand Up @@ -384,9 +346,7 @@ def run_docker_container(self,
# Add the image name
cmd.append(image_name)
# update the CMD to run the server
cmd.extend(
["--model_path", "/app/model_dir/main", "--start_dev_server", "--port",
str(port)])
cmd.extend(["--model_path", "/app/model_dir/main", "--grpc", "--port", str(port)])
# Run the container
process = subprocess.Popen(cmd,)
logger.info(
Expand Down Expand Up @@ -519,11 +479,11 @@ def main(model_path,
)
sys.exit(1)
manager = ModelRunLocally(model_path)
manager.uploader.download_checkpoints()
manager.builder.download_checkpoints()
if inside_container:
if not manager.is_docker_installed():
sys.exit(1)
manager.uploader.create_dockerfile()
manager.builder.create_dockerfile()
image_tag = manager._docker_hash()
image_name = f"{manager.config['model']['id']}:{image_tag}"
container_name = manager.config['model']['id']
Expand Down
14 changes: 6 additions & 8 deletions clarifai/runners/models/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@
from .model_class import ModelClass


class ModelRunner(BaseRunner, ModelClass, HealthProbeRequestHandler):
class ModelRunner(BaseRunner, HealthProbeRequestHandler):
"""
This is a subclass of the runner class which will handle only the work items relevant to models.

It is also a subclass of ModelClass so that any subclass of ModelRunner will need to just
implement predict(), generate() and stream() methods and load_model() if needed.
"""

def __init__(
self,
model: ModelClass,
runner_id: str,
nodepool_id: str,
compute_cluster_id: str,
Expand All @@ -43,7 +41,7 @@ def __init__(
num_parallel_polls,
**kwargs,
)
self.load_model()
self.model = model
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does model.load_model() get called now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens here, when the model is constructed. I'm actually not sure if we need to separate __init__ from load_model now, we could also have it load in its init. To me it looks like reason we didn't do that in the old class structure, was because the init function was for the Runner, not the Model, and so had all the params for runner ids etc. as its init args, which we didn't want the user to have to deal with in their implementation. Now that the model is not inheriting from runner, we can use init instead and remove load_model. The only advantage to keeping it would be if we wanted to allow for construction before checkpoint loading, e.g. for examining config without loading the checkpoint; there isn't a place we do that yet, though.

https://github.com/Clarifai/clarifai-python/pull/494/files#diff-265e698c90224658ed2a2213722fc5ddac5cec98e352802afcbf75760789e383R105

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think we should keep load_model to make it more clear what you have to implement as a user.


# After model load successfully set the health probe to ready and startup
HealthProbeRequestHandler.is_ready = True
Expand Down Expand Up @@ -83,7 +81,7 @@ def runner_item_predict(self,
request = runner_item.post_model_outputs_request
ensure_urls_downloaded(request)

resp = self.predict_wrapper(request)
resp = self.model.predict_wrapper(request)
successes = [o.status.code == status_code_pb2.SUCCESS for o in resp.outputs]
if all(successes):
status = status_pb2.Status(
Expand Down Expand Up @@ -113,7 +111,7 @@ def runner_item_generate(
request = runner_item.post_model_outputs_request
ensure_urls_downloaded(request)

for resp in self.generate_wrapper(request):
for resp in self.model.generate_wrapper(request):
successes = []
for output in resp.outputs:
if not output.HasField('status') or not output.status.code:
Expand Down Expand Up @@ -141,7 +139,7 @@ def runner_item_generate(
def runner_item_stream(self, runner_item_iterator: Iterator[service_pb2.RunnerItem]
) -> Iterator[service_pb2.RunnerItemOutput]:
# Call the generate() method the underlying model implements.
for resp in self.stream_wrapper(pmo_iterator(runner_item_iterator)):
for resp in self.model.stream_wrapper(pmo_iterator(runner_item_iterator)):
successes = []
for output in resp.outputs:
if not output.HasField('status') or not output.status.code:
Expand Down
Loading
Loading