Skip to main content

Online Material

Nextflow is a workflow manager and domain specific language (DSL) built to make workflows reproducible through modularization, abstractions, and cooperation with containerized technologies. The DSL is built upon Groovy but it is not essential to know this language to utilize Nextflow. It is helpful to know for its more advanced functionalities but that is beyond the scope of this workshop. If you would like to see how groovy can empower your workflows please see the training documentation provided by nf-core!

Nextflow workflows follow the dataflow paradigm meaning that data has a first in first out behavior, flowing from one function to the next. However, how that information flows is dictated by the workflow declaration. Each declaration consists of several key elements including channels, processes, an executor, container calls, and workflow declarations (yes workflows within workflows!). Each play a different role in the portability and reproducibility of these workflows.

Let us begin our discussion with the workflow's lifeblood, channels.

Channels

Channels are asychronous queue like objects with first in first out behavior consisting of two essential properties :

  1. The sending of a channel is asychronous or non-blocking
  2. The receiving of a channel is synchronous or blocking.

This allows for data to flow through the workflow unhindered but also ensuring that the functions of Nextflow don't attempt to run without the data in hand.

These objects come in two generic types : Queue channels and Value channels.

  • Queue channels are single use channels and are consumed on use.
  • Value channels are multiuse channels consisting of a single value.
tip

Channels make or break your workflow. Be very cognizant of your channel setup!

Channel Factories

Channels don't just spontaneously appear, they are created using Channel Factories. There are 8 or so channel factories that dictate additional properties of a channel.

empty

The empty factory generates a channel that emits no value, meaning it can accept data but does not share that data.

example_ch = Channel.empty()

of

The of channel emits whatever you give it. This is a handy generic factory to use when you aren't sure what you'll be emitting from the channel.

example_ch = Channel.of(0,1,2,3)

fromPath

The fromPath channel emits one or more file paths. It doesn't matter if it s a single file, a group of files, a directory, or a group of directories. As long as its a valid file path the channel will emit it.

example_ch = Channel.fromPath('/example/path')

fromFilePairs

The fromFilePairs channel emits the file pair that matches a glob pattern. A glob pattern is a groovy regex pattern. So, as an example, assume you are processing a group of chest x-rays with Left and Right images for each subject. You can populate a channel with the matching regex pattern and pass them as a pair to a process.

example_ch = Channel.fromFilePairs(*_{L/R}.xray)

fromSRA

The fromSRA channel is particularly useful to genomics workflows. It accepts an SRR number and it will go and query the SRA database for that SRR. It will then populate the channel with the fastq files related to that SRR number.

example_ch = Channel.fromSRA(SRR1254235)

watchPath

The watchPath channel watches a folder for one or more files matching a glob pattern. It essentially waits for a file to be generated and then populates the channel with that file path.

example_ch = Channel.watchPath('/example/file.fasta)

fromList

The fromList channel emits the values provided as a list of elements.

example_ch = Channel.fromList('Go','Tigers','Win', 'That', 'Tournament')

value

The value channel is the recommended way to generate a value channel. This can be anything, a file path to a singular value.

example_ch = Channel.value()
tip

Any channel consisting of a SINGLE value, path, or object is treated as a value channel.

Operators

Channels are not immutable objects. They can be manipulated using what are called operators. There are several operators covering several key areas of manipulation. This includes filtering, reduction, parsing text, combining channels, forking, and simple mathematics operations. In this workshop we will demonstrate several of these but by no means all. You are encouraged to see the documentation.

tip

Note that nextflow utilizes something called implicit parallelism. That is if a process or channel operator has everything it needs to run it will run in parallel as resources allow. As such when you run these commands the order of the output will vary from what is shown below.

branch : Branch channels based off of a glob pattern.

Channel.of(500,700,"tigers","this",1001,1235)
.branch{
nan : it == "this"
nan2: it == "tigers"
num : it < 1000
num2: it > 1000 }

result.nan.view {"$it is not a number"}
result.num.view {"$it is a number less than 1000"}
result.nan2.view{"$it is not a number"}
result.num2.view{"$it is a number Greater than 1000"}

output:

500 is a number less than 1000
1001 is a number Greater than 1000
this is not a number
1235 is a number Greater than 1000
700 is a number less than 1000
tigers is not a number

sum and set : Sum the entire channel and set the value to a channel.

Channel.of(132508,2359730,3285908392085,320,0.333,0.3532432,23430.3)
.sum().set{result2}
result2.view {"The sum of the channel is $it"}

output:

The sum of the channel is 3285910908073.9862432

Max : Identify the maximum value of the channel.

Channel.of(132508,2359730,3285908392085,320,0.333,0.3532432,23430.3)
.max()
.view{"Max value : $it"}

output:

Max value : 3285908392085

join : Creates a channel that joins together the items emitted by two other channels for which exists a matching key. This key is determined dynamically. Also note that for the entries that no pair exists they are not reported in the view statement.

primary_ch = Channel.of(['Tigers',1],['Clemson',1889],['CCIT',1000],['CAFLS',13245])
secondary_ch = Channel.of(['Tigers',5],['Clemson',2018],['CAFLS',2018])
primary_ch.join(secondary_ch).view()

output:

[Tigers, 1, 5]
[Clemson, 1889, 2018]
[CAFLS, 1893, 2018]

distinct: removes consecutive duplicated items from a channel, so that each emitted item is different from the preceding one.

repeat_ch = Channel.of('Go','Go','Go','Tigers','Tigers','Tigers','win!','win!','win!','Go','Go','Tigers','Tigers')
repeat_ch.distinct().view()

output:

Go
Tigers
win!
Go
Tigers

randomSample : is a pseudo-random sampler. It will randomly sample a list once and then keep that random sample until the list changes. Note in this example the ,234 is the seed. This is critical to set for reproducibility.

Channel.of(1..1000)
.randomSample(7, 234)
.view()

output:

1
5
100
254
678
999
672

Processes

Processes are the functions of nextflow. They consist of 4 aspects, the process declaration, input channel declarations, output channel declarations, and a script block. The script block must always be at the end of the process declaration.

For each discreet data point entering into a process Nextflow spins up what is called a task. Each task is an instance of the process and run in parallel to one another. These tasks are isolated and do not share write states enabling parallel I/O processes. Their only means of communication is channels. Another thing to keep in mind about processes is that they do not share configurations. What I mean by that is if you load any modules in one process it will not persist to the next process. You either have to call such things explicitly in each process declaration or call it in a configuration file.

process DoSomething {
input:
path x

output:
path y

script:
"""
Rscript DoSomething.R $x $y
"""
}

Executors

Defines the environment in which the computation is executed. These are abstractions, or simplifications, that enable greater portability between different HPC systems. In the event you develop a workflow on AWS batch and want to run it in a system utilizing slurm you should only have to modify the configuration file to port it between the two.

These abstractions are powerful. They not only negotiate with the scheduler on your behalf, they also generate the batch scripts for the additional jobs it wants to spin up for you.

tip

If you define an executor each task will be spun up as a new job. So please be cognizant of the number of jobs you spin up and how often using the correct executor configuration.

executor {
executor = slurm
queueSize = 4
pollInterval = '30 sec'
}

Config files

Defines the configuration of the workflow. These configuration files consist of parameter declarations, container calls, executor definitions, resource requests, module loads, ect.

params.str = "Hello world!"
process{
executor = slurm
cpus = 4
mem = 10.GB
queueSize = 4
moduel = ['r/4.2.2', 'anacodna3']
}
executor {
queueSize = 4
pollInterval = '30 sec'
}

Nextflow File System and Resume

Now to take a step aside before discussing workflow / subworkflow declarations lets discuss the working file system that Nextflow uses. For each task generated , Nextflow generates a unique hash. This hash is then used to generate a subdirectory in a directory called "work". In this subdirectory it will store any and all files related to that task. This includes any and all results your analysis generates. You must use publishdir command in your process declarations to ensure that your results are written to another directory. See the figure below for a visual breakdown of a work directory.

work/
├── 38
│   └── 7326dfe49eea2ca192015b4752cb54
│   └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
├── 53
│   └── a903dd35193d06feab1cd2c226adc3
│   └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
├── b9
│   └── 4277ef702247713182d8a632dc572b
│   └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
├── dc
│   └── 2a839d95100393c2f1c0c462115e73
│   └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
└── f5
└── 9ae0c094ed30ea3f83b632a6dc9c9b
├── moving_picture_tutorial
│   ├── aligned-rep-seqs.qza
│   ├── demux-details.qza
│   ├── demux.qza
│   ├── emp-single-end-sequences
│   │   ├── barcodes.fastq.gz
│   │   └── sequences.fastq.gz
│   ├── emp-single-end-sequences.qza
│   ├── masked-aligned-rep-seqs.qza
│   ├── rep-seqs.qza
│   ├── rooted-tree.qza
│   ├── sample_metadata.tsv
│   ├── stats.qza
│   ├── table.qza
│   └── unrooted-tree.qza
└── test.sh -> /home/zigerst/scripts/nextflow_tests/scripts/test.sh

Now this may seem like a major draw back but its done for a reason. This hash system enables Nextflow's resume functionality. In the event the workflow is interrupted and restarted it will check to see what was the latest recorded hash that CORRESPONDS to the latest hash directory in work. It will then resume the workflow from there. So its important to preserve the .nexflow/cache as well as the work dir when you are working.

tip

Once you finish your work and verified you have your results saved to a permanent location, please make sure to erase this work directory! They eat up space quickly when left unattended.

Workflows, Modularity and Subworkflows

The workflow declaration is what dictates the order in which the defined processes are executed. Nextflow utilizes 'lazy' execution meaning it does not generate a DAG from the declaration. Rather it parses the workflow logic and runs what can be run. This is why your channel set up is so critical in nextflow workflows. Depending on which channels are populated and what processes expect them will dictate what will be executed first.

Workflows can call other elements from other .nf files. This modularity allows you to reuse developed processes from other workflows without having to rewrite them. This also enables binary usage within a workflow. Additionally, you can call other workflows within a workflow. Allowing you to reuse entire workflows for novel situations.

# Workflow declarations
workflow letters {
splitLetters | flatten | capitalizeLetters | view ({it.trim})
}

workflow qiime {
prepare_data(params.init)
init_qiime(prepare_data.out)
demux(init_qiime.out)
denoise(demux.out)
phylogeny(denoise.out)
}

# Calling modules
include {Someprocess} from 'somefile.nf'

# Calling Subworkflows
workflow {
letters()
qiime()
}

Running Nextflow on Palmetto2

module load nextflow
module load openjdk/11.0.20.1_1
nextflow run <file.nf>

Note Explaining container functionality is beyond the scope of this workshop.