Skip to content

Commit

Permalink
chore: start tasks based on processing batch size in the queue-proces…
Browse files Browse the repository at this point in the history
…sing example
  • Loading branch information
luigidifraiawork committed Jun 15, 2024
1 parent 1aafef3 commit b2c0d87
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions application-code/lambda-function-queue-trigger/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import boto3, os
import boto3
from math import ceil

sqs = boto3.client('sqs')
ecs = boto3.client('ecs')
ssm = boto3.client('ssm')

batch_size = 10
max_tasks_per_run = 100


def lambda_handler(event, context):
max_tasks = None
sqs_url = None
job_mode = None
pipeline_enabled = None
TASK_CLUSTER = None
TASK_CONTAINER = None
Expand Down Expand Up @@ -52,58 +55,59 @@ def lambda_handler(event, context):
if param['Name'] == 'PIPELINE_S3_DEST_PREFIX':
S3_DEST_PREFIX = param['Value']
if (sqs_url and pipeline_enabled and max_tasks and
TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP):
TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP):
max_tasks = int(max_tasks)
else:
raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED,PIPELINE_ECS_CLUSTER,"
"PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION,PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX")
if (pipeline_enabled != "1"):
raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED,"
"PIPELINE_ECS_CLUSTER,PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION,"
"PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX")
if pipeline_enabled != "1":
print("ECS Pipeline is Disabled. Not starting tasks via Lambda.")
return
sqs_response = sqs.get_queue_attributes(
QueueUrl=sqs_url,
AttributeNames=[ 'ApproximateNumberOfMessages' ]
AttributeNames=['ApproximateNumberOfMessages']
)
sqs_queue_size = int(sqs_response['Attributes']['ApproximateNumberOfMessages'])
print("Current SQS Queue size: " + str(sqs_queue_size))
if sqs_queue_size == 0:
return
ecs_response = ecs.list_tasks(
cluster=TASK_CLUSTER,maxResults=100,desiredStatus='RUNNING',family=TASK_CONTAINER)
cluster=TASK_CLUSTER, maxResults=100, desiredStatus='RUNNING', family=TASK_CONTAINER)
current_running_tasks = len(ecs_response["taskArns"])
available_tasks = max_tasks - current_running_tasks
tasks_to_start = min([sqs_queue_size, available_tasks, max_tasks_per_run, max_tasks])
print("ECS Tasks to start: " + str(tasks_to_start))
if tasks_to_start<=0:
if tasks_to_start <= 0:
return
run_task_response = ecs.run_task(
capacityProviderStrategy=[
{
'capacityProvider': 'FARGATE',
'weight': 1,
'base': 2
'capacityProvider': 'FARGATE',
'weight': 1,
'base': 2
}, {
'capacityProvider': 'FARGATE_SPOT',
'weight': 4,
'base': 0
'capacityProvider': 'FARGATE_SPOT',
'weight': 4,
'base': 0
}
],
cluster=TASK_CLUSTER,
taskDefinition=TASK_DEFINITION,
overrides={
'containerOverrides': [
{
'name': TASK_CONTAINER,
'environment': [
{
'name': 'PIPELINE_ECS_JOB_MODE',
'value': '1'
}, {
'name': 'PIPELINE_S3_DEST_PREFIX',
'value': S3_DEST_PREFIX
'name': TASK_CONTAINER,
'environment': [
{
'name': 'PIPELINE_ECS_JOB_MODE',
'value': '1'
}, {
'name': 'PIPELINE_S3_DEST_PREFIX',
'value': S3_DEST_PREFIX
}
]
}
]
}
]
},
count=tasks_to_start,
Expand All @@ -114,6 +118,7 @@ def lambda_handler(event, context):
'securityGroups': [TASK_SECURITYGROUP],
'assignPublicIp': 'DISABLED'
}
}
},
propagateTags='TASK_DEFINITION'
)
return tasks_to_start

0 comments on commit b2c0d87

Please sign in to comment.