Running with Multiple Dask Nodes on SLURM
This tutorial uses CSD3 to run a pipeline across multiple nodes. An alternative HPC cluster that uses SLURM can be used but the relevant changes will need to be made.
Create a YAML config file for the data you want to process, for this tutorial we will call it
example.yaml:
ps_dir: "/path/to/20B-356_Array_Kuband.ps" dataset_key: "20B-356_Array_Kuband_0" swiftly_config: "4k[1]-n2k-256" output_dir: "/path/to/pipeline" wtower_size: 10000 parallel_cleaning: true niter: 10 dask_address: DASK_ADDRESSThe last line should be left as is, the remainder of the config should be altered to meet your requirements. In this case we have set the pipeline to run parallel hogbom cleaning on a 4k image of 20B-356_Array_Kuband.ps for key 356_Array_Kuband_0/
An environment setup script called
setup_environment.shto ensure the correct environment is loaded to run the pipeline:
#!/bin/bash # Source bashrc to make module command available source ${HOME}/.bashrc # Load required base modules for icelake (includes Intel MPI 2021.6.0) module purge module load rhel8/default-icl # Load conda CONDA_PATH="${HOME}/miniconda3" source $CONDA_PATH/etc/profile.d/conda.sh conda activate self-cal # Ensure conda env is on the path PATH="${HOME}/miniconda3/envs/self-cal/bin:${PATH}" LD_LIBRARY_PATH="${HOME}/miniconda3/envs/self-cal/lib/:${LD_LIBRARY_PATH}"This is used to ensure identical environment containing all required depencies across all slurm nodes. If you can be sure that this is already the case in advance then this script can be empty. Otherwise this script should be modified to meet your specific setup/needs.
A SLURM job script called
run_pipeline.shis then created to run the dirty_image script using the config file:
#!/bin/bash # # Example file for submitting multinode pipeline jobs on CSD3 # Adjust requested resources and job variables as necessary # #SBATCH --job-name=my-distvis-job #SBATCH --nodes=6 #SBATCH --ntasks-per-node=1 #SBATCH --cpus-per-task=76 #SBATCH --time=00:15:00 #SBATCH --exclusive #SBATCH --partition=icelake-himem #SBATCH --account=SKA-SDHP-SL2-CPU #SBATCH --signal=B:TERM@180 ##### User defined variables ##### Optional: BASE_JOB_DIR=${HOME}/myjobdir SCRIPT_PATH_FULL=$(scontrol show job "$SLURM_JOB_ID" | awk -F= '/Command=/{print $2}') SCRIPT_DIR=$(dirname -- "${SCRIPT_PATH_FULL}") ##### Required: INPUT_CONFIG=${SCRIPT_DIR}/example.yaml WHICH_PIPELINE=clean_hogbom BASE_OUTDIR=${BASE_JOB_DIR}/output DASK_SCRATCH="/local" DASK_WORKERS_PER_NODE=5 DASK_THREADS_PER_WORKER=4 DASK_MEMORY_PER_WORKER=100GB DASK_PORT=8786 # Script to set up environment. Will be run on each node to ensure identical environments # Put loading of modules and setup of conda env etc. in here as needed # If you trust environment to already be set up and identical across nodes then this can be empty. ENV_SETUP=${SCRIPT_DIR}/setup_environment.sh # ----------------------------------------------------------------------------- # Anything below should not be edited # ----------------------------------------------------------------------------- ##### Setup environment, ensure correct modules loaded etc. source ${ENV_SETUP} TMP_DIR=$(mktemp -d -t ${SLURM_JOB_NAME}_XXXX) set -x ##### Create output directories PIPELINE_DIR=$BASE_OUTDIR/pipeline DASK_LOGS_DIR=$BASE_OUTDIR/dask_logs SYSMON_DIR=$BASE_OUTDIR/sysmon SETUP_DIR=$BASE_OUTDIR/setup mkdir -p $BASE_OUTDIR mkdir -p $PIPELINE_DIR mkdir -p $DASK_LOGS_DIR mkdir -p $SYSMON_DIR mkdir -p $SETUP_DIR ##### Fetch list of nodes NODES=($(scontrol show hostnames)) HEAD_NODE="$(hostname)" NODES_SPACE_SEPARATED="${NODES[*]}" echo "Allocated nodes: ${NODES_SPACE_SEPARATED}" echo "Head node: ${HEAD_NODE}" ##### Start dask scheduler on head node DASK_SCHEDULER_ADDR="${HEAD_NODE}:${DASK_PORT}" dask scheduler --port ${DASK_PORT} >"${DASK_LOGS_DIR}/scheduler_${HEAD_NODE}.log" 2>&1 & echo "Started dask scheduler on ${DASK_SCHEDULER_ADDR}" ##### Write config required to run the pipeline echo "Write yaml config with dask address" WORKING_CONFIG="${TMP_DIR}/$(\basename -- ${INPUT_CONFIG})" cat ${INPUT_CONFIG} | sed "s|DASK_ADDRESS|${DASK_SCHEDULER_ADDR}|" > ${WORKING_CONFIG} #### Start system monitor on ALL nodes #### Start dask workers on all nodes (even on head node, it's fine) for node in "${NODES[@]}"; do echo "Start dask and system monitor on ${node}" outfile=${SYSMON_DIR}/"system_usage_$node.json" logfile=${DASK_LOGS_DIR}/worker_${node}.log ssh "${node}" /bin/bash << EOF & # Force environment to be the same on all nodes # And log some basic data on each node echo "setup node: ${node}" >> ${logfile}; source ${ENV_SETUP} >> ${logfile} 2>&1; echo \${PATH} >> ${logfile} 2>&1; echo \${LD_LIBRARY_PATH} >> ${logfile} 2>&1; # Start worker echo "start dask worker: ${node}" >> ${logfile} 2>&1; nohup dask worker ${DASK_SCHEDULER_ADDR} --name "${node}" --local-directory ${DASK_SCRATCH} --nworkers ${DASK_WORKERS_PER_NODE} --nthreads ${DASK_THREADS_PER_WORKER} --memory-limit ${DASK_MEMORY_PER_WORKER} < /dev/null >>${logfile} 2>&1 & echo "setup finished: ${node}" >> ${logfile} 2>&1; EOF done ##### Start pipeline # "exec" so that SIGTERM propagates to the pipeline executable echo "Launch pipeline" exec ska_sdp_distributed_self_cal_prototype --pipeline-name ${WHICH_PIPELINE} --config-file ${WORKING_CONFIG}This will run the
clean_hogbompipeline workflow using the settings fromexample.yamlfor 15 minutes on 6 nodes with 5 workers per node and 4 threads per worker. All of which can be changed to meet your exact requirements.
The SLURM job script can then be submitted:
sbatch run_pipeline.sh
Once the job is complete, various output artefacts and logs will be created in the output directories