Creating a Custom Pipeline
This tutorial shows the process of creating a custom pipeline using the high level tasks provided. Custom pipelines aren’t officially supported, though it is anticipated that advanced users will want to create custom functionality regardless.
Creating a pipeline
This section describes how custom functionality can be added to this package. The top level interface consists of management classes and task functions.
Classes
PipelineConfig: This class is used to store all the configuration information and metadata needed by the pipeline.ProcessingSetManager: This class manages the on-disk data and I/O operations.Swiftly: This class manages and coordinates swiftly FFT operations (subgrids and facets).Gridder: This class is used to create (de)gridding tasks.
Tasks
High level tasks are defined in tasks. If users want to create custom functionality, this should be implemented as functions in tasks.py.
Custom gridders
It is possible to add custom gridding kernels, though the gridding API must be compatible with the W-towers kernel.
Adding the pipeline
Once a custom pipeline has been created, it needs to be added to the package so it can be run. The following steps assume that the package has been installed in editable mode. If this isn’t the case, the package must be reinstalled after.
1. Add a new Python module containing the custom pipeline to the pipelines directory.
In this example, we create a file named custom_pipeline.py with the following contents:
from pathlib import Path
from ska_sdp_distributed_self_cal_prototype.logger import setup_logger
from ska_sdp_distributed_self_cal_prototype.workflow.tasks import (
bin_data,
configure_and_setup_pipeline,
generate_facets_with_corrections,
grid_visibilities,
save_image,
)
logger = setup_logger(__name__)
def custom_pipeline(config_filepath: Path) -> None:
"""Custom pipeline that creates a dirty image then saves it.
Args:
config_filepath: Path to the configuration file used to set up the pipeline parameters.
Returns:
None
"""
(
pipeline_config,
processing_set_manager,
swiftly_manager,
gridding_manager,
dask_client,
) = configure_and_setup_pipeline(config_filepath)
logger.info("Bin visibilities for subgrids")
visibility_bins, binning_info = bin_data(processing_set_manager, dask_client)
logger.info("Grid visibilities")
swiftly_manager = grid_visibilities(visibility_bins, binning_info, swiftly_manager, gridding_manager)
logger.info("Generate facets with corrections")
dirty_image_facets = generate_facets_with_corrections(
swiftly_manager, gridding_manager, binning_info["normalisation_factor"]
)
logger.info("Combine facets into single image")
dirty_image = swiftly_manager.join_facets(dirty_image_facets)
logger.info(dirty_image.shape)
logger.info("Saving image...")
save_image(pipeline_config, dirty_image, pipeline_config.output_filenames["dirty_image"])
logger.info("Done!")
dask_client.close()
In
src/ska_sdp_distributed_self_cal_prototype/__main__.py, import the custom pipeline by adding the following line:
from ska_sdp_distributed_self_cal_prototype.workflow.pipelines.custom_pipeline import custom_pipeline
Then add the custom pipeline to the
defined_pipelinesvariable
defined_pipelines: dict[str, Callable] = {
"custom_pipeline": custom_pipeline
"dirty_image": dirty_image_pipeline,
"dirty_beam_and_psf": psf_and_dirty_image,
"clean_hogbom": clean_hogbom,
"clean_hogbom_with_degridding": clean_hogbom_with_degridding,
"continuum_imaging": continuum_imaging,
}
The custom pipeline can now be run on the command line. Details can be found in Running the Pipeline.