-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added worker for Snowflake's Snowpark Container Services #16393
base: main
Are you sure you want to change the base?
Conversation
src/integrations/prefect-snowflake/tests/test_spcs_worker.py
Dismissed
Show dismissed
Hide dismissed
This pull request is stale because it has been open 14 days with no activity. To keep this pull request open remove stale label or comment. |
Is anything else required for this to be merged? |
Hey @bjorhn! Apologies for the delay in review. Could you share some instructions for how to test out this worker? I haven't used Snowpark Container Service before, so some guides or documentation would be very helpful! |
Hey @desertaxle! No problem. Give me a week or so and I'll update the PR with testing instructions. |
I've "reverse-engineered" some of our setup scripts, cli helper tools etc into something you should be able to use. I haven't tested it so there might be a spelling error or some minor detail missing (such as imports for the Python script) but hopefully it should be relatively straightforward. Let me know otherwise and I'll write more details. Configure SnowflakeCreate a new database and schema for storing the compute pools, images etc: USE ROLE SYSADMIN;
CREATE DATABASE COMMON;
CREATE SCHEMA COMMON.COMPUTE; Create the COMPUTE role and assign the required grants: USE ROLE ACCOUNTADMIN;
CREATE ROLE COMPUTE;
GRANT ROLE COMPUTE TO ROLE SYSADMIN;
GRANT ALL ON WAREHOUSE EXPLORATION TO ROLE COMPUTE;
GRANT USAGE ON DATABASE COMMON TO ROLE COMPUTE;
GRANT USAGE, CREATE IMAGE REPOSITORY, CREATE STAGE, CREATE SERVICE, BIND SERVICE ENDPOINT, CREATE TABLE ON SCHEMA COMMON.COMPUTE TO ROLE COMPUTE;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE COMPUTE; Create a compute pool and grant related privileges: USE ROLE ACCOUNTADMIN;
CREATE COMPUTE POOL ORCHESTRATION
MIN_NODES = 1
MAX_NODES = 1
INSTANCE_FAMILY = CPU_X64_S;
GRANT USAGE, MONITOR ON COMPUTE POOL ORCHESTRATION TO ROLE COMPUTE; Create an image repository: USE ROLE ACCOUNTADMIN;
CREATE IMAGE REPOSITORY IF NOT EXISTS COMMON.COMPUTE.PREFECT; Create Prefect API secret and grant the required privileges on the secret: USE ROLE ACCOUNTADMIN;
CREATE SECRET COMMON.COMPUTE.PREFECT_API_KEY_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = '**redacted**';
GRANT READ ON SECRET COMMON.COMPUTE.PREFECT_API_KEY_SECRET TO ROLE COMPUTE; Create network rules for egress, since no egress is allowed by default: USE ROLE ACCOUNTADMIN;
CREATE NETWORK RULE COMMON.COMPUTE.ALLOW_ALL_RULE
TYPE = 'HOST_PORT'
MODE = 'EGRESS'
VALUE_LIST = ('0.0.0.0:443','0.0.0.0:80');
CREATE OR REPLACE NETWORK RULE COMMON.NETWORK.SNOWFLAKE_ACCOUNT_RULE
MODE = 'EGRESS'
TYPE = 'HOST_PORT'
VALUE_LIST = ('yourorgname-youraccountname.snowflakecomputing.com');
CREATE EXTERNAL ACCESS INTEGRATION ALLOW_ALL_EXTERNAL_ACCESS_INT
ALLOWED_NETWORK_RULES = (COMMON.NETWORK.ALLOW_ALL_RULE)
ENABLED = true;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION SNOWFLAKE_ACCOUNT_ACCESS_INT
ALLOWED_NETWORK_RULES = (COMMON.NETWORK.SNOWFLAKE_ACCOUNT_RULE)
ENABLED = true;
GRANT USAGE ON INTEGRATION ALLOW_ALL_EXTERNAL_ACCESS_INT TO ROLE COMPUTE;
GRANT USAGE ON INTEGRATION SNOWFLAKE_ACCOUNT_ACCESS_INT TO ROLE COMPUTE; Deploy Prefect workerIf you're planning to run the worker locally (and not in Snowflake) you can ignore this chapter. Make sure you've set up snow cli correctly, then log into SPCS so you can push images to Snowflake: snow spcs image-registry login --role compute Once logged in you can build and push your docker image the way you normally would. Finally, my Python script for starting the worker using the Snowflake Python SDK looks a little bit like this (simplified): connection_parameters = {
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"private_key_file": os.environ["SNOWFLAKE_PRIVATE_KEY_PATH"],
"private_key_file_pwd": os.environ["SNOWFLAKE_PASSWORD"],
"role": "COMPUTE",
}
session = Session.builder.configs(connection_parameters).create()
root = Root(session)
service_name = "prefect_worker"
services = root.databases["common"].schemas["compute"].services.iter(like=service_name)
for existing_service in services:
if existing_service.name == service_name.upper():
print(f"Service {existing_service.name} already exists, dropping...")
root.databases["common"].schemas["compute"].services[existing_service.name].drop()
break
worker_name = "prefect-worker"
work_pool_name = "spcs"
service_spec = dedent(f"""
spec:
containers:
- name: {service_name.replace("_", "-")}
image: yourorg-youraccount.registry.snowflakecomputing.com/common/compute/prefect/prefect-worker:latest
args:
- /bin/bash
- -c
- prefect worker start --name {worker_name} --pool {work_pool_name}
env:
PREFECT_API_URL: *YOUR PREFECT API URL HERE*
PREFECT_API_KEY: *YOUR PREFECT API KEY HERE*
PREFECT_API_ENABLE_HTTP2: false
resources:
requests:
cpu: 0.1
memory: 0.5G
limits:
cpu: 0.1
memory: 0.5G
logExporters:
eventTableConfig:
logLevel: INFO
platformMonitor:
metricConfig:
groups:
- system
- system_limits
""")
print(f"Starting service {service_name}...")
service_comment = "Prefect worker"
service = Service(
name=service_name,
compute_pool="orchestration",
spec=ServiceSpec(service_spec),
external_access_integrations=["allow_all_external_access_int", "snowflake_account_access_int"],
comment=service_comment,
)
root.databases["common"].schemas["compute"].services.create(service)
console.print("Restart successful!") Once the worker is up and running you can configure the settings you need in the worker pool, let me know if you have any issues and I can try to describe all of the required settings. Register Prefect deploymentOnce the worker is up and running you should be able to register deployments for the worker the way you would with any other worker. Don't forget to run snow spcs image-registry login --role compute before pushing images to your image registry in Snowflake. |
This PR adds a new worker to the prefect-snowflake integration. The worker can run service jobs in Snowflake's Snowpark Container Services. Closes #15674
A few things worth mentioning:
I don't know the best way to test the worker after moving the modules over from our company repo to the Prefect repo. Perhaps someone could test it for me, or leave instructions in a comment. The code should be the same as it were before, it's just some import paths that have changed.
There are two bugs I've bumped into along the way which I don't think are caused by my code. I don't know if these issues only affect custom workers, but I think so: