Running with Multiple Dask Nodes on SLURM

This tutorial uses CSD3 to run a pipeline across multiple nodes. An alternative HPC cluster that uses SLURM can be used but the relevant changes will need to be made.

  1. Create a YAML config file for the data you want to process, for this tutorial we will call it example.yaml:

ps_dir: "/path/to/20B-356_Array_Kuband.ps"
dataset_key: "20B-356_Array_Kuband_0"
swiftly_config: "4k[1]-n2k-256"
output_dir: "/path/to/pipeline"
wtower_size: 10000
parallel_cleaning: true
niter: 10
dask_address: DASK_ADDRESS

The last line should be left as is, the remainder of the config should be altered to meet your requirements. In this case we have set the pipeline to run parallel hogbom cleaning on a 4k image of 20B-356_Array_Kuband.ps for key 356_Array_Kuband_0/

  1. An environment setup script called setup_environment.sh to ensure the correct environment is loaded to run the pipeline:

#!/bin/bash

# Source bashrc to make module command available
source ${HOME}/.bashrc

# Load required base modules for icelake (includes Intel MPI 2021.6.0)
module purge
module load rhel8/default-icl

# Load conda
CONDA_PATH="${HOME}/miniconda3"
source $CONDA_PATH/etc/profile.d/conda.sh
conda activate self-cal

# Ensure conda env is on the path
PATH="${HOME}/miniconda3/envs/self-cal/bin:${PATH}"
LD_LIBRARY_PATH="${HOME}/miniconda3/envs/self-cal/lib/:${LD_LIBRARY_PATH}"

This is used to ensure identical environment containing all required depencies across all slurm nodes. If you can be sure that this is already the case in advance then this script can be empty. Otherwise this script should be modified to meet your specific setup/needs.

  1. A SLURM job script called run_pipeline.sh is then created to run the dirty_image script using the config file:

#!/bin/bash
#
# Example file for submitting multinode pipeline jobs on CSD3
# Adjust requested resources and job variables as necessary
#
#SBATCH --job-name=my-distvis-job
#SBATCH --nodes=6
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=76
#SBATCH --time=00:15:00
#SBATCH --exclusive
#SBATCH --partition=icelake-himem
#SBATCH --account=SKA-SDHP-SL2-CPU
#SBATCH --signal=B:TERM@180


##### User defined variables
##### Optional:
BASE_JOB_DIR=${HOME}/myjobdir
SCRIPT_PATH_FULL=$(scontrol show job "$SLURM_JOB_ID" | awk -F= '/Command=/{print $2}')
SCRIPT_DIR=$(dirname -- "${SCRIPT_PATH_FULL}")
##### Required:
INPUT_CONFIG=${SCRIPT_DIR}/example.yaml
WHICH_PIPELINE=clean_hogbom
BASE_OUTDIR=${BASE_JOB_DIR}/output
DASK_SCRATCH="/local"
DASK_WORKERS_PER_NODE=5
DASK_THREADS_PER_WORKER=4
DASK_MEMORY_PER_WORKER=100GB
DASK_PORT=8786
# Script to set up environment. Will be run on each node to ensure identical environments
# Put loading of modules and setup of conda env etc. in here as needed
# If you trust environment to already be set up and identical across nodes then this can be empty.
ENV_SETUP=${SCRIPT_DIR}/setup_environment.sh


# -----------------------------------------------------------------------------
# Anything below should not be edited
# -----------------------------------------------------------------------------

##### Setup environment, ensure correct modules loaded etc.
source ${ENV_SETUP}
TMP_DIR=$(mktemp -d -t ${SLURM_JOB_NAME}_XXXX)
set -x


##### Create output directories
PIPELINE_DIR=$BASE_OUTDIR/pipeline
DASK_LOGS_DIR=$BASE_OUTDIR/dask_logs
SYSMON_DIR=$BASE_OUTDIR/sysmon
SETUP_DIR=$BASE_OUTDIR/setup
mkdir -p $BASE_OUTDIR
mkdir -p $PIPELINE_DIR
mkdir -p $DASK_LOGS_DIR
mkdir -p $SYSMON_DIR
mkdir -p $SETUP_DIR


##### Fetch list of nodes
NODES=($(scontrol show hostnames))
HEAD_NODE="$(hostname)"
NODES_SPACE_SEPARATED="${NODES[*]}"
echo "Allocated nodes: ${NODES_SPACE_SEPARATED}"
echo "Head node: ${HEAD_NODE}"


##### Start dask scheduler on head node
DASK_SCHEDULER_ADDR="${HEAD_NODE}:${DASK_PORT}"
dask scheduler --port ${DASK_PORT} >"${DASK_LOGS_DIR}/scheduler_${HEAD_NODE}.log" 2>&1 &
echo "Started dask scheduler on ${DASK_SCHEDULER_ADDR}"


##### Write config required to run the pipeline
echo "Write yaml config with dask address"
WORKING_CONFIG="${TMP_DIR}/$(\basename -- ${INPUT_CONFIG})"
cat ${INPUT_CONFIG} | sed "s|DASK_ADDRESS|${DASK_SCHEDULER_ADDR}|" > ${WORKING_CONFIG}


#### Start system monitor on ALL nodes
#### Start dask workers on all nodes (even on head node, it's fine)
for node in "${NODES[@]}"; do
    echo "Start dask and system monitor on ${node}"
    outfile=${SYSMON_DIR}/"system_usage_$node.json"
    logfile=${DASK_LOGS_DIR}/worker_${node}.log
    ssh "${node}" /bin/bash << EOF &
        # Force environment to be the same on all nodes
        # And log some basic data on each node
        echo "setup node: ${node}" >> ${logfile};
        source ${ENV_SETUP} >> ${logfile} 2>&1;
        echo \${PATH} >> ${logfile} 2>&1;
        echo \${LD_LIBRARY_PATH} >> ${logfile} 2>&1;

        # Start worker
        echo "start dask worker: ${node}" >> ${logfile} 2>&1;
        nohup dask worker ${DASK_SCHEDULER_ADDR} --name "${node}" --local-directory ${DASK_SCRATCH} --nworkers ${DASK_WORKERS_PER_NODE} --nthreads ${DASK_THREADS_PER_WORKER} --memory-limit ${DASK_MEMORY_PER_WORKER} < /dev/null  >>${logfile} 2>&1 &
        echo "setup finished: ${node}" >> ${logfile} 2>&1;
EOF
done


##### Start pipeline
# "exec" so that SIGTERM propagates to the pipeline executable
echo "Launch pipeline"
exec ska_sdp_distributed_self_cal_prototype  --pipeline-name ${WHICH_PIPELINE}  --config-file ${WORKING_CONFIG}

This will run the clean_hogbom pipeline workflow using the settings from example.yaml for 15 minutes on 6 nodes with 5 workers per node and 4 threads per worker. All of which can be changed to meet your exact requirements.

  1. The SLURM job script can then be submitted:

sbatch run_pipeline.sh
  1. Once the job is complete, various output artefacts and logs will be created in the output directories