Skip to content

Commit

Permalink
chore: start tasks based on processing batch size in the queue-proces…
Browse files Browse the repository at this point in the history
…sing example (#220)

* fix: address typo in task definition variable name

* chore: start tasks based on processing batch size in the queue-processing example

---------

Co-authored-by: Jooyoung Kim <[email protected]>
  • Loading branch information
luigidifraiawork and joozero authored Jul 1, 2024
1 parent 222bfd7 commit 512eed8
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions application-code/lambda-function-queue-trigger/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import boto3, os
import boto3
from math import ceil

sqs = boto3.client('sqs')
ecs = boto3.client('ecs')
ssm = boto3.client('ssm')

batch_size = 10
max_tasks_per_run = 100


def lambda_handler(event, context):
max_tasks = None
sqs_url = None
job_mode = None
pipeline_enabled = None
TASK_CLUSTER = None
TASK_CONTAINER = None
Expand Down Expand Up @@ -62,48 +65,48 @@ def lambda_handler(event, context):
return
sqs_response = sqs.get_queue_attributes(
QueueUrl=sqs_url,
AttributeNames=[ 'ApproximateNumberOfMessages' ]
AttributeNames=['ApproximateNumberOfMessages']
)
sqs_queue_size = int(sqs_response['Attributes']['ApproximateNumberOfMessages'])
print("Current SQS Queue size: " + str(sqs_queue_size))
if sqs_queue_size == 0:
return
ecs_response = ecs.list_tasks(
cluster=TASK_CLUSTER,maxResults=100,desiredStatus='RUNNING',family=TASK_CONTAINER)
cluster=TASK_CLUSTER, maxResults=100, desiredStatus='RUNNING', family=TASK_CONTAINER)
current_running_tasks = len(ecs_response["taskArns"])
available_tasks = max_tasks - current_running_tasks
tasks_to_start = min([sqs_queue_size, available_tasks, max_tasks_per_run, max_tasks])
tasks_to_start = min([ceil(sqs_queue_size / batch_size), available_tasks, max_tasks_per_run, max_tasks])
print("ECS Tasks to start: " + str(tasks_to_start))
if tasks_to_start<=0:
if tasks_to_start <= 0:
return
run_task_response = ecs.run_task(
capacityProviderStrategy=[
{
'capacityProvider': 'FARGATE',
'weight': 1,
'base': 2
'capacityProvider': 'FARGATE',
'weight': 1,
'base': 2
}, {
'capacityProvider': 'FARGATE_SPOT',
'weight': 4,
'base': 0
'capacityProvider': 'FARGATE_SPOT',
'weight': 4,
'base': 0
}
],
cluster=TASK_CLUSTER,
taskDefinition=TASK_DEFINITION,
overrides={
'containerOverrides': [
{
'name': TASK_CONTAINER,
'environment': [
{
'name': 'PIPELINE_ECS_JOB_MODE',
'value': '1'
}, {
'name': 'PIPELINE_S3_DEST_PREFIX',
'value': S3_DEST_PREFIX
'name': TASK_CONTAINER,
'environment': [
{
'name': 'PIPELINE_ECS_JOB_MODE',
'value': '1'
}, {
'name': 'PIPELINE_S3_DEST_PREFIX',
'value': S3_DEST_PREFIX
}
]
}
]
}
]
},
count=tasks_to_start,
Expand All @@ -114,6 +117,7 @@ def lambda_handler(event, context):
'securityGroups': [TASK_SECURITYGROUP],
'assignPublicIp': 'DISABLED'
}
}
},
propagateTags='TASK_DEFINITION'
)
return tasks_to_start

0 comments on commit 512eed8

Please sign in to comment.