From d7c6b972fa47b1da1c1707b215529907dfe2890d Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Thu, 16 Nov 2023 11:06:23 -0500 Subject: [PATCH] Dask container recipe (#545) --- dask/2023.10.1-py11-ol9/Dockerfile | 56 ++++++++++ dask/2023.10.1-py11-ol9/conda-env.yml | 28 +++++ .../2023.10.1-py11-ol9/config/dask-config.yml | 48 ++++++++ .../scripts/determine_ip.sh | 30 +++++ dask/2023.10.1-py11-ol9/scripts/prepare.sh | 12 ++ .../scripts/startmanager.sh | 81 ++++++++++++++ .../2023.10.1-py11-ol9/scripts/startworker.sh | 104 ++++++++++++++++++ .../scripts/waitforanyfile.sh | 43 ++++++++ .../scripts/waitformanager.sh | 33 ++++++ .../scripts/waitforworkers.sh | 89 +++++++++++++++ 10 files changed, 524 insertions(+) create mode 100644 dask/2023.10.1-py11-ol9/Dockerfile create mode 100644 dask/2023.10.1-py11-ol9/conda-env.yml create mode 100644 dask/2023.10.1-py11-ol9/config/dask-config.yml create mode 100644 dask/2023.10.1-py11-ol9/scripts/determine_ip.sh create mode 100644 dask/2023.10.1-py11-ol9/scripts/prepare.sh create mode 100644 dask/2023.10.1-py11-ol9/scripts/startmanager.sh create mode 100644 dask/2023.10.1-py11-ol9/scripts/startworker.sh create mode 100644 dask/2023.10.1-py11-ol9/scripts/waitforanyfile.sh create mode 100644 dask/2023.10.1-py11-ol9/scripts/waitformanager.sh create mode 100644 dask/2023.10.1-py11-ol9/scripts/waitforworkers.sh diff --git a/dask/2023.10.1-py11-ol9/Dockerfile b/dask/2023.10.1-py11-ol9/Dockerfile new file mode 100644 index 00000000..4ef64088 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/Dockerfile @@ -0,0 +1,56 @@ +ARG DASK_VERSION=2023.10.1 + +FROM oraclelinux:9 + +MAINTAINER biocontainers + +LABEL base_image="oraclelinux:9" +LABEL version="1" +LABEL software="Dask" +LABEL software.version="2023.10.1-py11-ol9" +LABEL about.summary="Dask is a flexible library for parallel computing in Python." +LABEL about.home="https://www.dask.org" +LABEL about.license="BSD-3-Clause" +LABEL about.license_file="https://github.com/dask/dask/blob/main/LICENSE.txt" +LABEL about.documentation="https://docs.dask.org/en/stable/" +LABEL extra.binaries="/opt/mambaforge/bin" +LABEL extra.scripts="/opt/scripts/daskscripts" +LABEL about.tags="implemented-in::python, interface::daemon, role::devel-lib" +LABEL conda_forge.minforge.version="23.3.1-1" +LABEL python.version="3.11" + +ARG TARGETPLATFORM + +ENV NVIDIA_VISIBLE_DEVICES=all +ENV NVIDIA_DRIVER_CAPABILITIES=all + +RUN dnf update -y && \ + dnf install -y \ + tar wget \ + hostname \ + procps-ng \ + net-tools \ + which + +# Install miniconda +RUN wget https://github.com/conda-forge/miniforge/releases/download/23.3.1-1/Mambaforge-23.3.1-1-Linux-$(uname -m).sh \ + -O mamba-install.sh && \ + bash mamba-install.sh -b -p /opt/mambaforge + +ENV PATH=/opt/mambaforge/bin:${PATH} + +COPY conda-env.yml /tmp/ + +RUN mamba env update -n base -f /tmp/conda-env.yml + +RUN rm -rf /opt/mambaforge/pkgs + +WORKDIR /opt/scripts/daskscripts + +ENV DASK_CONFIG=/opt/scripts/daskscripts/config/dask-config.yml + +# Add scripts +COPY scripts/* /opt/scripts/daskscripts/ +COPY config /opt/scripts/daskscripts/config + +RUN chmod 755 /opt/scripts/daskscripts/*.sh diff --git a/dask/2023.10.1-py11-ol9/conda-env.yml b/dask/2023.10.1-py11-ol9/conda-env.yml new file mode 100644 index 00000000..5eca4a88 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/conda-env.yml @@ -0,0 +1,28 @@ +channels: + - conda-forge + - defaults +dependencies: + - python=3.11 + - pip + - bokeh>=2.4.2,<3 + - cachey=0.2.1 + - cytoolz>=0.12 + - dask=2023.10.1 + - distributed=2023.10.1 + - h5py + - jq=1.6 + - jupyter-server-proxy + - lz4>=4.3.2 + - msgpack-python + - nomkl=3.0 + - numpy>=1.26.0 + - pandas=1.5.1 + - psutil>=5.7.2 + - python-blosc=1.10.6 + - s3fs>=2021.9.0 + - scipy>=1.9.1 + - streamz=0.6.3 + - tblib>=1.6.0 + - zarr=2.16.1 + - pip: + - cloudpickle==3.0.0 diff --git a/dask/2023.10.1-py11-ol9/config/dask-config.yml b/dask/2023.10.1-py11-ol9/config/dask-config.yml new file mode 100644 index 00000000..aefac438 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/config/dask-config.yml @@ -0,0 +1,48 @@ +distributed: + admin: + tick: + interval: 500ms # time between event loop health checks + limit: 2h # time allowed between triggering a warning + + comm: + retry: + count: 10 + + timeouts: + connect: 600s + + distributed: + nanny: + pre-spawn-environ: + MALLOC_TRIM_THRESHOLD_: 0 + + scheduler: + allowed-failures: 5 # default 3 + worker-ttl: 120s # default 5min + unknown-task-duration: 120s + + locks: + lease-timeout: 900s + lease-validation-interval: 300s + + worker: + use-file-locking: False + + lifetime: + stagger: '5 minutes' + + memory: + transfer: 0.5 # default 0.1 + target: 0.9 # default 0.6 + spill: 0.9 # default 0.7 + pause: false # default 0.8 + terminate: false # default 0.95 + recent-to-old-time: 300s + monitor-interval: 500ms # default 100ms + + rebalance: + sender-min: 0.5 + recipient-max: 0.5 + + profile: + interval: 50ms # default 10ms diff --git a/dask/2023.10.1-py11-ol9/scripts/determine_ip.sh b/dask/2023.10.1-py11-ol9/scripts/determine_ip.sh new file mode 100644 index 00000000..b98b0053 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/determine_ip.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Determine the IP address of the current host + +container_engine=$1 + +local_ip= +if [ "$container_engine" = "docker" ]; then + for interface in /sys/class/net/{eth*,en*,em*}; do + [ -e $interface ] && \ + [ `cat $interface/operstate` == "up" ] && \ + local_ip=$(ifconfig `basename $interface` | grep "inet " | awk '$1=="inet" {print $2; exit}' | sed s/addr://g) + if [[ "$local_ip" != "" ]]; then + echo "Use IP: $local_ip" + break + fi + done + if [[ -z "${local_ip}" ]] ; then + echo "Could not determine local IP: local_ip is empty" + exit 1 + fi +else + # Take the last IP that's listed by hostname -i. + # This hack works on Janelia Cluster and AWS EC2. + local_ip=`hostname -i | rev | cut -d' ' -f1 | rev` + echo "Use Spark IP: $local_ip" + if [[ -z "${local_ip}" ]] ; then + echo "Could not determine local IP: local_ip is empty" + exit 1 + fi +fi \ No newline at end of file diff --git a/dask/2023.10.1-py11-ol9/scripts/prepare.sh b/dask/2023.10.1-py11-ol9/scripts/prepare.sh new file mode 100644 index 00000000..63ff12c4 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/prepare.sh @@ -0,0 +1,12 @@ +#!/bin/bash -ue +# Prepare the given dir by creating it, or cleaning it +# of Dask content if it already exists. +work_dir=$1 + +if [[ ! -d "${work_dir}" ]] ; then + echo "Creating work directory: ${work_dir}" + mkdir -p "${work_dir}" +else + echo "Cleaning existing work directory: ${work_dir}" + rm -rf ${work_dir}/* || true +fi diff --git a/dask/2023.10.1-py11-ol9/scripts/startmanager.sh b/dask/2023.10.1-py11-ol9/scripts/startmanager.sh new file mode 100644 index 00000000..37e1ca18 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/startmanager.sh @@ -0,0 +1,81 @@ +#!/bin/bash -ue +# Start the Dask scheduler process and wait for terminate signal +DIR=$(cd "$(dirname "$0")"; pwd) + +container_engine= +scheduler_pid_file= +terminate_file= +scheduler_start_timeout= +scheduler_poll_interval= +scheduler_work_dir=. +args=() + +while [[ $# > 0 ]]; do + key="$1" + shift # past the key + case $key in + --container-engine) + container_engine="$1" + shift + ;; + --pid-file) + scheduler_pid_file=$1 + args=("${args[@]}" "--pid-file" "${scheduler_pid_file}") + shift + ;; + --scheduler-start-timeout) + scheduler_start_timeout=$1 + shift + ;; + --scheduler-poll-interval) + scheduler_poll_interval=$1 + shift + ;; + --scheduler-work-dir) + scheduler_work_dir=$1 + shift + ;; + --terminate-file) + terminate_file=$1 + shift + ;; + *) + args=("${args[@]}" "${key}") + ;; + esac +done + +function cleanup() { + echo "Killing scheduler background processes" + if [[ -f "${scheduler_pid_file}" ]]; then + local dpid=$(cat "${scheduler_pid_file}") + kill "$dpid" + fi + exit 0 +} +trap cleanup INT TERM EXIT + +echo "Determining scheduler IP address..." +. $DIR/determine_ip.sh $container_engine + +# start scheduler in background +echo "Run: dask scheduler --host ${local_ip} ${args[@]}" +dask scheduler --host ${local_ip} ${args[@]} \ + 2> >(tee ${scheduler_work_dir}/dask-scheduler.log >&2) \ + & + +# wait for PID file +# make sure there is a timeout param since the default wait does not timeout +${DIR}/waitforanyfile.sh 0 "${terminate_file},${scheduler_pid_file}" ${scheduler_start_timeout} + +if [[ -e "${scheduler_pid_file}" ]] ; then + scheduler_pid=$(cat "${scheduler_pid_file}") + echo "Scheduler started: pid=$scheduler_pid" + + # scheduler was started - wait until terminate + echo "Wait for termination event: ${terminate_file}" + ${DIR}/waitforanyfile.sh ${scheduler_pid} "${terminate_file}" +else + echo "Scheduler pid file not found" + scheduler_pid=0 +fi diff --git a/dask/2023.10.1-py11-ol9/scripts/startworker.sh b/dask/2023.10.1-py11-ol9/scripts/startworker.sh new file mode 100644 index 00000000..502aeef2 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/startworker.sh @@ -0,0 +1,104 @@ +#!/bin/bash -ue +# Start the Dask worker process and wait for terminate signal +DIR=$(cd "$(dirname "$0")"; pwd) + +container_engine= +worker_name= +worker_dir= +worker_pid_file= +terminate_file= +worker_start_timeout= +worker_poll_interval= +scheduler_address= +args=() + +while [[ $# > 0 ]]; do + key="$1" + shift # past the key + case $key in + --container-engine) + container_engine="$1" + shift + ;; + --pid-file) + worker_pid_file=$1 + args=("${args[@]}" "--pid-file" "${worker_pid_file}") + shift + ;; + --name) + worker_name=$1 + args=("${args[@]}" "--name" "${worker_name}") + shift + ;; + --worker-dir) + worker_dir=$1 + shift + ;; + --scheduler-address) + scheduler_address=$1 + shift + ;; + --worker-start-timeout) + worker_start_timeout=$1 + shift + ;; + --worker-poll-interval) + worker_poll_interval=$1 + shift + ;; + --terminate-file) + terminate_file=$1 + shift + ;; + *) + args=("${args[@]}" "${key}") + ;; + esac +done + +function cleanup() { + echo "Killing background processes for ${worker_name}" + if [[ -f "${worker_pid_file}" ]]; then + local wpid=$(cat "${worker_pid_file}") + kill "$wpid" + fi + exit 0 +} +trap cleanup INT TERM EXIT + +echo "Determining worker ${worker_name} IP address..." +. $DIR/determine_ip.sh $container_engine + +if [[ -n ${worker_dir} ]] ; then + mkdir -p ${worker_dir} +fi + +# start worker in background +echo "Run: dask worker \ + --host ${local_ip} \ + --local-directory ${worker_dir} \ + ${args[@]} \ + ${scheduler_address}" +dask worker \ + --host ${local_ip} \ + --local-directory ${worker_dir} \ + ${args[@]} \ + ${scheduler_address} \ + 2> >(tee ${worker_dir}/${worker_name}.log >&2) \ + & + +# wait for PID file +# make sure there is a timeout param since the default wait does not timeout +${DIR}/waitforanyfile.sh 0 "${terminate_file},${worker_pid_file}" ${worker_start_timeout} + +if [[ -e "${worker_pid_file}" ]] ; then + worker_pid=$(cat "${worker_pid_file}") + echo "Worker ${worker_name} started: pid=$worker_pid" + + # worker was started - wait until terminate + echo "Worker ${worker_name} - wait for termination event: ${terminate_file}" + ${DIR}/waitforanyfile.sh ${worker_pid} "${terminate_file}" +else + echo "Worker ${worker_name} pid file not found" + worker_pid=0 +fi diff --git a/dask/2023.10.1-py11-ol9/scripts/waitforanyfile.sh b/dask/2023.10.1-py11-ol9/scripts/waitforanyfile.sh new file mode 100644 index 00000000..5d9fc6f8 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/waitforanyfile.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# Wait for the specified file to be created +# +declare apid=$1 +declare fs_string=$2 +declare -i wait_timeout=${3:--1} # default to no timeout +declare -i polling_interval=${4:-5} # default to 5s + +IFS=',' read -ra fs_list <<< "${fs_string}" + +if (( ${wait_timeout} > 0 )); then + echo "Check for process=${apid} and files: ${fs_list[@]} with a timeout of ${wait_timeout} seconds (polling interval = ${polling_interval}s)" +else + echo "Check for process=${apid} and files: ${fs_list[@]} with NO timeout (polling interval = ${polling_interval}s)" +fi + +declare -i seconds=0 +while true ; do + + if [[ "${apid}" != "0" ]] ; then + # if apid is set check that the process is alive + if ! kill -0 $apid >/dev/null 2>&1 ; then + echo "Process $apid died" + exit 1 + fi + fi + + # check timeout + if (( ${wait_timeout} > 0 && ${seconds} > ${wait_timeout} )); then + echo "Timed out after ${seconds} seconds while waiting for ${fs_list[@]}" + exit 2 + fi + + for f in "${fs_list[@]}" ; do + if [[ -e "${f}" ]] ; then + echo "Found ${f}" + exit 0 + fi + done + + sleep $polling_interval + seconds=$(( ${seconds} + ${polling_interval} )) +done diff --git a/dask/2023.10.1-py11-ol9/scripts/waitformanager.sh b/dask/2023.10.1-py11-ol9/scripts/waitformanager.sh new file mode 100644 index 00000000..83ce18ba --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/waitformanager.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# Wait for the specified file to be created +# +DIR=$(cd "$(dirname "$0")"; pwd) + +flist= +scheduler_start_timeout=60 +scheduler_poll_interval=2 +args=() + +while [[ $# > 0 ]]; do + key="$1" + shift # past the key + case $key in + --flist) + flist=$1 + shift + ;; + --scheduler-start-timeout) + scheduler_start_timeout=$1 + shift + ;; + --scheduler-poll-interval) + scheduler_poll_interval=$1 + shift + ;; + *) + # unknown params + ;; + esac +done + +$DIR/waitforanyfile.sh "0" "$flist" ${scheduler_start_timeout} ${scheduler_poll_interval} diff --git a/dask/2023.10.1-py11-ol9/scripts/waitforworkers.sh b/dask/2023.10.1-py11-ol9/scripts/waitforworkers.sh new file mode 100644 index 00000000..a2961089 --- /dev/null +++ b/dask/2023.10.1-py11-ol9/scripts/waitforworkers.sh @@ -0,0 +1,89 @@ +#!/bin/bash -ue +# Start the Dask scheduler process and wait for terminate signal +DIR=$(cd "$(dirname "$0")"; pwd) + +scheduler_address= +cluster_work_dir= +terminate_file= +declare -i worker_start_timeout=-1 +declare -i worker_poll_interval=1 +declare -i total_workers=0 +declare -i required_workers=0 + +while [[ $# > 0 ]]; do + key="$1" + shift # past the key + case $key in + --cluster-work-dir) + cluster_work_dir=$1 + shift + ;; + --worker-start-timeout) + worker_start_timeout=$(($1)) + shift + ;; + --worker-poll-interval) + scheduler_poll_interval=$(($1)) + shift + ;; + --total-workers) + total_workers=$(($1)) + shift + ;; + --required-workers) + required_workers=$(($1)) + shift + ;; + --terminate-file) + terminate_file=$1 + shift + ;; + --scheduler-address) + scheduler_address=$1 + shift + ;; + *) + ;; + esac +done + +if (( ${required_workers} > 0 )); then + seconds=0 + while true; do + if [[ -e ${terminate_file} ]] ; then + # this can happen if the cluster is created on LSF and the workers cannot get nodes + # before the cluster is ended + available_workers=-1 + exit 1 + fi + available_workers=0 + for (( worker_id=1; worker_id<=${total_workers}; worker_id++ )); do + worker_name="worker-${worker_id}" + worker_log="${cluster_work_dir}/${worker_name}/${worker_name}.log" + # if worker's log exists check if the worker has connected to the scheduler + echo "Check ${worker_log}" + if [[ -e "${worker_log}" ]]; then + found=`grep -o "Registered to:.*${scheduler_address}" ${worker_log} || true` + if [[ ! -z ${found} ]]; then + echo "${found}" + available_workers=$(( ${available_workers} + 1 )) + fi + fi + done + echo "Found ${available_workers} after ${seconds}" + # in case somebody forgets to adjust the required workers check also if it is equal to total_workers + if (( ${available_workers} >= ${total_workers} || ${available_workers} >= ${required_workers} )); then + echo "Found ${available_workers} connected workers" + break; + fi + if (( ${worker_start_timeout} > 0 && ${seconds} > ${worker_start_timeout} )); then + echo "Timed out after ${seconds} seconds while waiting for at least ${required_workers} workers to connect to scheduler" + available_workers=-1 + exit 2 + fi + sleep ${worker_poll_interval} + seconds=$(( ${seconds} + ${worker_poll_interval} )) + done +else + available_workers=-1 +fi