Creating a Custom Pipeline ========================== This tutorial shows the process of creating a custom pipeline using the high level tasks provided. Custom pipelines aren't officially supported, though it is anticipated that advanced users will want to create custom functionality regardless. Creating a pipeline ------------------- This section describes how custom functionality can be added to this package. The top level interface consists of management classes and task functions. Classes ~~~~~~~ - :class:`PipelineConfig `: This class is used to store all the configuration information and metadata needed by the pipeline. - :class:`ProcessingSetManager `: This class manages the on-disk data and I/O operations. - :class:`Swiftly `: This class manages and coordinates swiftly FFT operations (subgrids and facets). - :class:`Gridder `: This class is used to create (de)gridding tasks. Tasks ~~~~~ High level tasks are defined in :doc:`../tasks`. If users want to create custom functionality, this should be implemented as functions in ``tasks.py``. Custom gridders ~~~~~~~~~~~~~~~ It is possible to add custom gridding kernels, though the gridding API must be compatible with the `W-towers kernel `_. Adding the pipeline ------------------- Once a custom pipeline has been created, it needs to be added to the package so it can be run. The following steps assume that the package has been installed in editable mode. If this isn't the case, the package must be reinstalled after. 1. Add a new Python module containing the custom pipeline to the ``pipelines`` directory. In this example, we create a file named ``custom_pipeline.py`` with the following contents: .. code-block:: python from pathlib import Path from ska_sdp_distributed_self_cal_prototype.logger import setup_logger from ska_sdp_distributed_self_cal_prototype.workflow.tasks import ( bin_data, configure_and_setup_pipeline, generate_facets_with_corrections, grid_visibilities, save_image, ) logger = setup_logger(__name__) def custom_pipeline(config_filepath: Path) -> None: """Custom pipeline that creates a dirty image then saves it. Args: config_filepath: Path to the configuration file used to set up the pipeline parameters. Returns: None """ ( pipeline_config, processing_set_manager, swiftly_manager, gridding_manager, dask_client, ) = configure_and_setup_pipeline(config_filepath) logger.info("Bin visibilities for subgrids") visibility_bins, binning_info = bin_data(processing_set_manager, dask_client) logger.info("Grid visibilities") swiftly_manager = grid_visibilities(visibility_bins, binning_info, swiftly_manager, gridding_manager) logger.info("Generate facets with corrections") dirty_image_facets = generate_facets_with_corrections( swiftly_manager, gridding_manager, binning_info["normalisation_factor"] ) logger.info("Combine facets into single image") dirty_image = swiftly_manager.join_facets(dirty_image_facets) logger.info(dirty_image.shape) logger.info("Saving image...") save_image(pipeline_config, dirty_image, pipeline_config.output_filenames["dirty_image"]) logger.info("Done!") dask_client.close() 2. In ``src/ska_sdp_distributed_self_cal_prototype/__main__.py``, import the custom pipeline by adding the following line: .. code-block:: python from ska_sdp_distributed_self_cal_prototype.workflow.pipelines.custom_pipeline import custom_pipeline 3. Then add the custom pipeline to the ``defined_pipelines`` variable .. code-block:: python defined_pipelines: dict[str, Callable] = { "custom_pipeline": custom_pipeline "dirty_image": dirty_image_pipeline, "dirty_beam_and_psf": psf_and_dirty_image, "clean_hogbom": clean_hogbom, "clean_hogbom_with_degridding": clean_hogbom_with_degridding, "continuum_imaging": continuum_imaging, } 4. The custom pipeline can now be run on the command line. Details can be found in :doc:`running_the_pipeline`.