Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added worker for Snowflake's Snowpark Container Services #16393

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

bjorhn
Copy link
Contributor

@bjorhn bjorhn commented Dec 15, 2024

This PR adds a new worker to the prefect-snowflake integration. The worker can run service jobs in Snowflake's Snowpark Container Services. Closes #15674

A few things worth mentioning:

  • Snowflake's Python API currently doesn't support running async service jobs, so I'm temporarily running jobs using SQL commands instead. I will submit a PR to resolve this as soon as Snowflake adds the required features to their Python API.
  • Most of the code was hacked together based on the official "custom worker" tutorial and copying things over from the Azure and Kubernetes workers. The same goes for the unit tests.
  • Since a lot of the structure was copied over, let me know if I misunderstood how something should be done. For example, am I using run_sync_in_worker_thread correctly?
  • I don't have a lot of experience creating unit tests in Python, so I've done the bare minimum with regards to unit testing.

I don't know the best way to test the worker after moving the modules over from our company repo to the Prefect repo. Perhaps someone could test it for me, or leave instructions in a comment. The code should be the same as it were before, it's just some import paths that have changed.

There are two bugs I've bumped into along the way which I don't think are caused by my code. I don't know if these issues only affect custom workers, but I think so:

@github-actions github-actions bot added the enhancement An improvement of an existing feature label Dec 15, 2024
Copy link
Contributor

This pull request is stale because it has been open 14 days with no activity. To keep this pull request open remove stale label or comment.

@bjorhn
Copy link
Contributor Author

bjorhn commented Jan 11, 2025

Is anything else required for this to be merged?

@desertaxle
Copy link
Member

Is anything else required for this to be merged?

Hey @bjorhn! Apologies for the delay in review. Could you share some instructions for how to test out this worker? I haven't used Snowpark Container Service before, so some guides or documentation would be very helpful!

@bjorhn
Copy link
Contributor Author

bjorhn commented Jan 23, 2025

Hey @desertaxle! No problem. Give me a week or so and I'll update the PR with testing instructions.

@bjorhn
Copy link
Contributor Author

bjorhn commented Jan 29, 2025

I've "reverse-engineered" some of our setup scripts, cli helper tools etc into something you should be able to use. I haven't tested it so there might be a spelling error or some minor detail missing (such as imports for the Python script) but hopefully it should be relatively straightforward. Let me know otherwise and I'll write more details.

Configure Snowflake

Create a new database and schema for storing the compute pools, images etc:

USE ROLE SYSADMIN;

CREATE DATABASE COMMON;
CREATE SCHEMA COMMON.COMPUTE;

Create the COMPUTE role and assign the required grants:

USE ROLE ACCOUNTADMIN;

CREATE ROLE COMPUTE;
GRANT ROLE COMPUTE TO ROLE SYSADMIN;
GRANT ALL ON WAREHOUSE EXPLORATION TO ROLE COMPUTE;

GRANT USAGE ON DATABASE COMMON TO ROLE COMPUTE;
GRANT USAGE, CREATE IMAGE REPOSITORY, CREATE STAGE, CREATE SERVICE, BIND SERVICE ENDPOINT, CREATE TABLE ON SCHEMA COMMON.COMPUTE TO ROLE COMPUTE;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE COMPUTE;

Create a compute pool and grant related privileges:

USE ROLE ACCOUNTADMIN;

CREATE COMPUTE POOL ORCHESTRATION
  MIN_NODES = 1
  MAX_NODES = 1
  INSTANCE_FAMILY = CPU_X64_S;

GRANT USAGE, MONITOR ON COMPUTE POOL ORCHESTRATION TO ROLE COMPUTE;

Create an image repository:

USE ROLE ACCOUNTADMIN;

CREATE IMAGE REPOSITORY IF NOT EXISTS COMMON.COMPUTE.PREFECT;

Create Prefect API secret and grant the required privileges on the secret:

USE ROLE ACCOUNTADMIN;

CREATE SECRET COMMON.COMPUTE.PREFECT_API_KEY_SECRET
  TYPE = GENERIC_STRING
  SECRET_STRING  = '**redacted**';

GRANT READ ON SECRET COMMON.COMPUTE.PREFECT_API_KEY_SECRET TO ROLE COMPUTE;

Create network rules for egress, since no egress is allowed by default:

USE ROLE ACCOUNTADMIN;

CREATE NETWORK RULE COMMON.COMPUTE.ALLOW_ALL_RULE
  TYPE = 'HOST_PORT'
  MODE = 'EGRESS'
  VALUE_LIST = ('0.0.0.0:443','0.0.0.0:80');

CREATE OR REPLACE NETWORK RULE COMMON.NETWORK.SNOWFLAKE_ACCOUNT_RULE
  MODE = 'EGRESS'
  TYPE = 'HOST_PORT'
  VALUE_LIST = ('yourorgname-youraccountname.snowflakecomputing.com');

CREATE EXTERNAL ACCESS INTEGRATION ALLOW_ALL_EXTERNAL_ACCESS_INT
  ALLOWED_NETWORK_RULES = (COMMON.NETWORK.ALLOW_ALL_RULE)
  ENABLED = true;

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION SNOWFLAKE_ACCOUNT_ACCESS_INT
  ALLOWED_NETWORK_RULES = (COMMON.NETWORK.SNOWFLAKE_ACCOUNT_RULE)
  ENABLED = true;

GRANT USAGE ON INTEGRATION ALLOW_ALL_EXTERNAL_ACCESS_INT TO ROLE COMPUTE;
GRANT USAGE ON INTEGRATION SNOWFLAKE_ACCOUNT_ACCESS_INT TO ROLE COMPUTE;

Deploy Prefect worker

If you're planning to run the worker locally (and not in Snowflake) you can ignore this chapter.

Make sure you've set up snow cli correctly, then log into SPCS so you can push images to Snowflake:

snow spcs image-registry login --role compute

Once logged in you can build and push your docker image the way you normally would.

Finally, my Python script for starting the worker using the Snowflake Python SDK looks a little bit like this (simplified):

connection_parameters = {
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "user": os.environ["SNOWFLAKE_USER"],
    "private_key_file": os.environ["SNOWFLAKE_PRIVATE_KEY_PATH"],
    "private_key_file_pwd": os.environ["SNOWFLAKE_PASSWORD"],
    "role": "COMPUTE",
}

session = Session.builder.configs(connection_parameters).create()
root = Root(session)

service_name = "prefect_worker"
services = root.databases["common"].schemas["compute"].services.iter(like=service_name)

for existing_service in services:
    if existing_service.name == service_name.upper():
        print(f"Service {existing_service.name} already exists, dropping...")
        root.databases["common"].schemas["compute"].services[existing_service.name].drop()
        break

worker_name = "prefect-worker"
work_pool_name = "spcs"

service_spec = dedent(f"""
    spec:
      containers:
      - name: {service_name.replace("_", "-")}
        image: yourorg-youraccount.registry.snowflakecomputing.com/common/compute/prefect/prefect-worker:latest
        args:
        - /bin/bash
        - -c
        - prefect worker start --name {worker_name} --pool {work_pool_name}
        env:
          PREFECT_API_URL: *YOUR PREFECT API URL HERE*
          PREFECT_API_KEY: *YOUR PREFECT API KEY HERE*
          PREFECT_API_ENABLE_HTTP2: false
        resources:
          requests:
            cpu: 0.1
            memory: 0.5G
          limits:
            cpu: 0.1
            memory: 0.5G
      logExporters:
        eventTableConfig:
          logLevel: INFO
      platformMonitor:
        metricConfig:
          groups:
          - system
          - system_limits
    """)

print(f"Starting service {service_name}...")

service_comment = "Prefect worker"

service = Service(
    name=service_name,
    compute_pool="orchestration",
    spec=ServiceSpec(service_spec),
    external_access_integrations=["allow_all_external_access_int", "snowflake_account_access_int"],
    comment=service_comment,
)

root.databases["common"].schemas["compute"].services.create(service)

console.print("Restart successful!")

Once the worker is up and running you can configure the settings you need in the worker pool, let me know if you have any issues and I can try to describe all of the required settings.

Register Prefect deployment

Once the worker is up and running you should be able to register deployments for the worker the way you would with any other worker. Don't forget to run snow spcs image-registry login --role compute before pushing images to your image registry in Snowflake.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New worker for running job services in Snowpark Container Services
2 participants