Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor model class and runners to be more independent #494

Merged
merged 38 commits into from
Jan 31, 2025
Merged
Changes from 26 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8df6acb
get model class from config
deigen Jan 22, 2025
622461c
download checkpoints command
deigen Jan 22, 2025
1fc5c22
pass model to server class, change grpc flag name
deigen Jan 22, 2025
cb9f59c
download_checkpoints option in server
deigen Jan 22, 2025
9fe3cf5
rename ModelUploader to ModelBuilder
deigen Jan 23, 2025
134e026
create_model_instance function
deigen Jan 23, 2025
a29661e
include main for now to reduce diff
deigen Jan 23, 2025
c612812
refactor upload model main to reduce diff
deigen Jan 23, 2025
24c2595
fix arg
deigen Jan 23, 2025
f2db76f
fixes
deigen Jan 23, 2025
ddc384a
fix formatting
deigen Jan 23, 2025
2312951
make class_info section optional
deigen Jan 23, 2025
921ad10
edit unittest
deigen Jan 23, 2025
a05a8b3
edit tests to use model class
deigen Jan 23, 2025
e851890
init model directly for unittest
deigen Jan 24, 2025
86a07f5
update cmdline
deigen Jan 24, 2025
991ede3
Merge branch 'master' into model-class-refactor
deigen Jan 24, 2025
b1337d7
fix runner inheritence and simplify test
deigen Jan 24, 2025
7a4a8a9
update calls in grpc server
deigen Jan 24, 2025
ee5987e
update names in test
deigen Jan 24, 2025
8fd2217
fix test
deigen Jan 24, 2025
556ab26
fix test
deigen Jan 24, 2025
1123b63
formatting
deigen Jan 24, 2025
1e20682
fix name in test
deigen Jan 24, 2025
e317b91
fix use of modelclass in tests
deigen Jan 24, 2025
829a55a
move load_model into default construction
deigen Jan 24, 2025
a3738a4
Merge branch 'master' into model-class-refactor
deigen Jan 27, 2025
ed00d68
remove download_checkpoints arg
deigen Jan 27, 2025
ab52d07
set uuid in test setup
deigen Jan 27, 2025
12daf0e
write configs to tempfiles to keep tests independent
deigen Jan 27, 2025
7f34602
set compute cluster id in nodepool test
deigen Jan 27, 2025
27fb1ec
revert unrelated changes to test_compute_orchestration
deigen Jan 29, 2025
7cb44d5
add error message for updaing runner class
deigen Jan 29, 2025
5fe1e7a
put back load_model as abstract method
deigen Jan 29, 2025
2d9af83
remove init args for this pr
deigen Jan 29, 2025
cd45884
fix tests
deigen Jan 29, 2025
9974c4d
copy all of 1/ dir
deigen Jan 31, 2025
c0716ba
Merge branch 'master' into model-class-refactor
deigen Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions clarifai/cli/model.py
Original file line number Diff line number Diff line change
@@ -29,9 +29,20 @@ def model():
)
def upload(model_path, download_checkpoints, skip_dockerfile):
"""Upload a model to Clarifai."""
from clarifai.runners.models import model_upload
from clarifai.runners.models.model_builder import upload_model
upload_model(model_path, download_checkpoints, skip_dockerfile)

model_upload.main(model_path, download_checkpoints, skip_dockerfile)

@model.command()
@click.option(
'--model_path',
type=click.Path(exists=True),
required=True,
help='Path to the model directory.')
def download_checkpoints(model_path):
"""Download remote checkpoints that are specified in the config."""
from clarifai.runners.models.model_builder import ModelBuilder
ModelBuilder(model_path).download_checkpoints()


@model.command()
4 changes: 2 additions & 2 deletions clarifai/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .models.base_typed_model import AnyAnyModel, TextInputModel, VisualInputModel
from .models.model_builder import ModelBuilder
from .models.model_runner import ModelRunner
from .models.model_upload import ModelUploader
from .utils.data_handler import InputDataHandler, OutputDataHandler

__all__ = [
"ModelRunner",
"ModelUploader",
"ModelBuilder",
"InputDataHandler",
"OutputDataHandler",
"AnyAnyModel",
4 changes: 2 additions & 2 deletions clarifai/runners/models/base_typed_model.py
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@
from google.protobuf import json_format

from ..utils.data_handler import InputDataHandler, OutputDataHandler
from .model_runner import ModelRunner
from .model_class import ModelClass


class AnyAnyModel(ModelRunner):
class AnyAnyModel(ModelClass):

def load_model(self):
"""
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import importlib
import inspect
import os
import re
import sys
@@ -13,6 +15,7 @@
from rich.markup import escape

from clarifai.client import BaseClient
from clarifai.runners.models.model_class import ModelClass
from clarifai.runners.utils.const import (AVAILABLE_PYTHON_IMAGES, AVAILABLE_TORCH_IMAGES,
CONCEPTS_REQUIRED_MODEL_TYPE, DEFAULT_PYTHON_VERSION,
PYTHON_BASE_IMAGE, TORCH_BASE_IMAGE)
@@ -28,7 +31,7 @@ def _clear_line(n: int = 1) -> None:
print(LINE_UP, end=LINE_CLEAR, flush=True)


class ModelUploader:
class ModelBuilder:

def __init__(self, folder: str, validate_api_ids: bool = True, download_validation_only=False):
"""
@@ -52,6 +55,56 @@ def __init__(self, folder: str, validate_api_ids: bool = True, download_validati
self.inference_compute_info = self._get_inference_compute_info()
self.is_v3 = True # Do model build for v3

def create_model_instance(self, load_model=True):
"""
Create an instance of the model class, as specified in the config file.
"""
class_config = self.config.get("class_info", {})

model_file = class_config.get("file_path")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters would provide much greater flexibility in defining the model. However, I'm uncertain whether we should include these additional parameters in the config.yaml file or establish a fixed structure for defining the model

Could this potentially confuse the users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main reason not to allow this, would be if we don't want to support and maintain these options. However if we allow init args in the config, it feels odd and incomplete without also being able to specify module and class. I found it convenient to be able to specify args in particular, but also the model file for tests. Note the defaults are what we have now, where it looks for a ModelClass in model.py.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm less stuff to customize in config.yaml is better I think. Right now we expect 1/model.py as the model file, it hink that can atleast be a default here and then let people override it.

We also need to badly get this config spec into a proto so it's defined what the fields are and we can parse it from the proto. Otherwise we'll have what we had before with config maps which got out of hand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed them for now. We can figure out a way to better include init args in a separate PR.

if model_file:
model_file = os.path.join(self.folder, model_file)
if not os.path.exists(model_file):
raise Exception(f"Model file {model_file} does not exist.")
else:
# look for default model.py file location
for loc in ["model.py", "1/model.py"]:
model_file = os.path.join(self.folder, loc)
if os.path.exists(model_file):
break
if not os.path.exists(model_file):
raise Exception("Model file not found.")

module_name = os.path.basename(model_file).replace(".py", "")

spec = importlib.util.spec_from_file_location(module_name, model_file)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)

if class_config.get("class_name"):
model_class = getattr(module, class_config["class_name"])
else:
# Find all classes in the model.py file that are subclasses of ModelClass
classes = [
cls for _, cls in inspect.getmembers(module, inspect.isclass)
if issubclass(cls, ModelClass) and cls.__module__ == module.__name__
]
# Ensure there is exactly one subclass of BaseRunner in the model.py file
if len(classes) != 1:
raise Exception(
"Could not determine model class. Please specify it in the config with class_info.model_class."
)
model_class = classes[0]

model_args = class_config.get("args", {})
Copy link
Contributor

@luv-bansal luv-bansal Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, what these args could be for initialising model class?
because we are not using any __init__ method in ModelClass

Copy link
Contributor Author

@deigen deigen Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should allow init args for specifying checkpoint files, paths to config files or other params. This would allow us to more easily init models for testing to point them at different paths. The other option would be to hardcode paths in a top-level main python file, and use those to init another model that has init args, but I like being able to specify them along with the rest of the config. That way we can write those configs with the rest of the model config, and not require to write a templated python file as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's a model specific thing why not just put it in model.py?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or these args are the inspected arts of the python function and not something we put into config.yaml right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to be able to pass in some config args to the model, like checkpoint_path or potentially preprocessing params or a path to another config file. That way when you're developing, you can init the model directly with different settings, and deploy it with a particular set of args for the model upload.

The alternative, would be to essentially make model.py a config file written in python with some extra boilerplate, rather than have the model definition. That would turn model.py into something like the following, which I think is worse than putting args in the config.yaml.

import yaml
from my_model import MyModel

model_config_file = 'model_config.yaml'
model_config = yaml.safe_load(open(model_config_file))

class MyModelClass(MyModel):
    def __init__(self):
        super().__init__(**model_config)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it seems we're a little unsure of how to handle these sorts of things now. I think allowing args or a path to another config file for init would be best. Otherwise we'll end up with boilerplate python files that are config files. But we can try to do that in a separate PR, and I can limit this one to just the ModelClass inheritance.


# initialize the model class with the args.
model = model_class(**model_args)
if load_model:
model.load_model()
return model

def _validate_folder(self, folder):
if folder == ".":
folder = "" # will getcwd() next which ends with /
@@ -564,19 +617,19 @@ def monitor_model_build(self):
return False


def main(folder, download_checkpoints, skip_dockerfile):
uploader = ModelUploader(folder)
def upload_model(folder, download_checkpoints, skip_dockerfile):
builder = ModelBuilder(folder)
if download_checkpoints:
uploader.download_checkpoints()
builder.download_checkpoints()
if not skip_dockerfile:
uploader.create_dockerfile()
exists = uploader.check_model_exists()
builder.create_dockerfile()
exists = builder.check_model_exists()
if exists:
logger.info(
f"Model already exists at {uploader.model_url}, this upload will create a new version for it."
f"Model already exists at {builder.model_url}, this upload will create a new version for it."
)
else:
logger.info(f"New model will be created at {uploader.model_url} with it's first version.")
logger.info(f"New model will be created at {builder.model_url} with it's first version.")

input("Press Enter to continue...")
uploader.upload_model_version(download_checkpoints)
builder.upload_model_version(download_checkpoints)
3 changes: 1 addition & 2 deletions clarifai/runners/models/model_class.py
Original file line number Diff line number Diff line change
@@ -21,9 +21,8 @@ def stream_wrapper(self, request: service_pb2.PostModelOutputsRequest
"""This method is used for input/output proto data conversion and yield outcome"""
return self.stream(request)

@abstractmethod
def load_model(self):
raise NotImplementedError("load_model() not implemented")
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want to guide people into always have to implement this, even if their implementation is "pass"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above. I don't know if we need this now. However, I'll change back to NotImplementedError for now, and the examples can keep the checkpoint loads there to guide users to that, as you mention.


@abstractmethod
def predict(self,
73 changes: 17 additions & 56 deletions clarifai/runners/models/model_run_locally.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import hashlib
import importlib.util
import inspect
import os
import platform
import shutil
@@ -14,9 +12,8 @@

from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from clarifai_protocol import BaseRunner

from clarifai.runners.models.model_upload import ModelUploader
from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.utils.url_fetcher import ensure_urls_downloaded
from clarifai.utils.logging import logger

@@ -27,9 +24,9 @@ def __init__(self, model_path):
self.model_path = model_path
self.requirements_file = os.path.join(self.model_path, "requirements.txt")

# ModelUploader contains multiple useful methods to interact with the model
self.uploader = ModelUploader(self.model_path, download_validation_only=True)
self.config = self.uploader.config
# ModelBuilder contains multiple useful methods to interact with the model
self.builder = ModelBuilder(self.model_path, download_validation_only=True)
self.config = self.builder.config

def _requirements_hash(self):
"""Generate a hash of the requirements file."""
@@ -91,38 +88,10 @@ def install_requirements(self):
self.clean_up()
sys.exit(1)

def _get_model_runner(self):
"""Dynamically import the runner class from the model file."""

# import the runner class that to be implement by the user
runner_path = os.path.join(self.model_path, "1", "model.py")

# arbitrary name given to the module to be imported
module = "runner_module"

spec = importlib.util.spec_from_file_location(module, runner_path)
runner_module = importlib.util.module_from_spec(spec)
sys.modules[module] = runner_module
spec.loader.exec_module(runner_module)

# Find all classes in the model.py file that are subclasses of BaseRunner
classes = [
cls for _, cls in inspect.getmembers(runner_module, inspect.isclass)
if issubclass(cls, BaseRunner) and cls.__module__ == runner_module.__name__
]

# Ensure there is exactly one subclass of BaseRunner in the model.py file
if len(classes) != 1:
raise Exception("Expected exactly one subclass of BaseRunner, found: {}".format(
len(classes)))

MyRunner = classes[0]
return MyRunner

def _build_request(self):
"""Create a mock inference request for testing the model."""

model_version_proto = self.uploader.get_model_version_proto()
model_version_proto = self.builder.get_model_version_proto()
model_version_proto.id = "model_version"

return service_pb2.PostModelOutputsRequest(
@@ -142,8 +111,8 @@ def _build_stream_request(self):
for i in range(1):
yield request

def _run_model_inference(self, runner):
"""Perform inference using the runner."""
def _run_model_inference(self, model):
"""Perform inference using the model."""
request = self._build_request()
stream_request = self._build_stream_request()

@@ -152,7 +121,7 @@ def _run_model_inference(self, runner):
generate_response = None
stream_response = None
try:
predict_response = runner.predict(request)
predict_response = model.predict(request)
except NotImplementedError:
logger.info("Model does not implement predict() method.")
except Exception as e:
@@ -172,7 +141,7 @@ def _run_model_inference(self, runner):
logger.info(f"Model Prediction succeeded: {predict_response}")

try:
generate_response = runner.generate(request)
generate_response = model.generate(request)
except NotImplementedError:
logger.info("Model does not implement generate() method.")
except Exception as e:
@@ -194,7 +163,7 @@ def _run_model_inference(self, runner):
f"Model Prediction succeeded for generate and first response: {generate_first_res}")

try:
stream_response = runner.stream(stream_request)
stream_response = model.stream(stream_request)
except NotImplementedError:
logger.info("Model does not implement stream() method.")
except Exception as e:
@@ -217,16 +186,10 @@ def _run_model_inference(self, runner):

def _run_test(self):
"""Test the model locally by making a prediction."""
# construct MyRunner which will call load_model()
MyRunner = self._get_model_runner()
runner = MyRunner(
runner_id="n/a",
nodepool_id="n/a",
compute_cluster_id="n/a",
user_id="n/a",
)
# Create the model
model = self.builder.create_model_instance()
# send an inference.
self._run_model_inference(runner)
self._run_model_inference(model)

def test_model(self):
"""Test the model by running it locally in the virtual environment."""
@@ -274,7 +237,7 @@ def run_model_server(self, port=8080):

command = [
self.python_executable, "-m", "clarifai.runners.server", "--model_path", self.model_path,
"--start_dev_server", "--port",
"--grpc", "--port",
str(port)
]
try:
@@ -383,9 +346,7 @@ def run_docker_container(self,
# Add the image name
cmd.append(image_name)
# update the CMD to run the server
cmd.extend(
["--model_path", "/app/model_dir/main", "--start_dev_server", "--port",
str(port)])
cmd.extend(["--model_path", "/app/model_dir/main", "--grpc", "--port", str(port)])
# Run the container
process = subprocess.Popen(cmd,)
logger.info(
@@ -518,11 +479,11 @@ def main(model_path,
)
sys.exit(1)
manager = ModelRunLocally(model_path)
manager.uploader.download_checkpoints()
manager.builder.download_checkpoints()
if inside_container:
if not manager.is_docker_installed():
sys.exit(1)
manager.uploader.create_dockerfile()
manager.builder.create_dockerfile()
image_tag = manager._docker_hash()
image_name = f"{manager.config['model']['id']}:{image_tag}"
container_name = manager.config['model']['id']
14 changes: 6 additions & 8 deletions clarifai/runners/models/model_runner.py
Original file line number Diff line number Diff line change
@@ -10,16 +10,14 @@
from .model_class import ModelClass


class ModelRunner(BaseRunner, ModelClass, HealthProbeRequestHandler):
class ModelRunner(BaseRunner, HealthProbeRequestHandler):
"""
This is a subclass of the runner class which will handle only the work items relevant to models.

It is also a subclass of ModelClass so that any subclass of ModelRunner will need to just
implement predict(), generate() and stream() methods and load_model() if needed.
"""

def __init__(
self,
model: ModelClass,
runner_id: str,
nodepool_id: str,
compute_cluster_id: str,
@@ -43,7 +41,7 @@ def __init__(
num_parallel_polls,
**kwargs,
)
self.load_model()
self.model = model
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does model.load_model() get called now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens here, when the model is constructed. I'm actually not sure if we need to separate __init__ from load_model now, we could also have it load in its init. To me it looks like reason we didn't do that in the old class structure, was because the init function was for the Runner, not the Model, and so had all the params for runner ids etc. as its init args, which we didn't want the user to have to deal with in their implementation. Now that the model is not inheriting from runner, we can use init instead and remove load_model. The only advantage to keeping it would be if we wanted to allow for construction before checkpoint loading, e.g. for examining config without loading the checkpoint; there isn't a place we do that yet, though.

https://github.com/Clarifai/clarifai-python/pull/494/files#diff-265e698c90224658ed2a2213722fc5ddac5cec98e352802afcbf75760789e383R105

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think we should keep load_model to make it more clear what you have to implement as a user.


# After model load successfully set the health probe to ready and startup
HealthProbeRequestHandler.is_ready = True
@@ -83,7 +81,7 @@ def runner_item_predict(self,
request = runner_item.post_model_outputs_request
ensure_urls_downloaded(request)

resp = self.predict_wrapper(request)
resp = self.model.predict_wrapper(request)
successes = [o.status.code == status_code_pb2.SUCCESS for o in resp.outputs]
if all(successes):
status = status_pb2.Status(
@@ -113,7 +111,7 @@ def runner_item_generate(
request = runner_item.post_model_outputs_request
ensure_urls_downloaded(request)

for resp in self.generate_wrapper(request):
for resp in self.model.generate_wrapper(request):
successes = []
for output in resp.outputs:
if not output.HasField('status') or not output.status.code:
@@ -141,7 +139,7 @@ def runner_item_generate(
def runner_item_stream(self, runner_item_iterator: Iterator[service_pb2.RunnerItem]
) -> Iterator[service_pb2.RunnerItemOutput]:
# Call the generate() method the underlying model implements.
for resp in self.stream_wrapper(pmo_iterator(runner_item_iterator)):
for resp in self.model.stream_wrapper(pmo_iterator(runner_item_iterator)):
successes = []
for output in resp.outputs:
if not output.HasField('status') or not output.status.code:
Loading
Loading