Online Material
Nextflow is a workflow manager and domain specific language (DSL) built to make workflows reproducible through modularization, abstractions, and cooperation with containerized technologies. The DSL is built upon Groovy but it is not essential to know this language to utilize Nextflow. It is helpful to know for its more advanced functionalities but that is beyond the scope of this workshop. If you would like to see how groovy can empower your workflows please see the training documentation provided by nf-core!
Nextflow workflows follow the dataflow paradigm meaning that data has a first in first out behavior, flowing from one function to the next. However, how that information flows is dictated by the workflow declaration. Each declaration consists of several key elements including channels, processes, an executor, container calls, and workflow declarations (yes workflows within workflows!). Each play a different role in the portability and reproducibility of these workflows.
Let us begin our discussion with the workflow's lifeblood, channels.
Channels
Channels are asychronous queue like objects with first in first out behavior consisting of two essential properties :
- The sending of a channel is asychronous or non-blocking
- The receiving of a channel is synchronous or blocking.
This allows for data to flow through the workflow unhindered but also ensuring that the functions of Nextflow don't attempt to run without the data in hand.
These objects come in two generic types : Queue channels and Value channels.
- Queue channels are single use channels and are consumed on use.
- Value channels are multiuse channels consisting of a single value.
Channels make or break your workflow. Be very cognizant of your channel setup!
Channel Factories
Channels don't just spontaneously appear, they are created using Channel Factories. There are 8 or so channel factories that dictate additional properties of a channel.
empty
The empty
factory generates a channel that emits no value, meaning it can
accept data but does not share that data.
example_ch = Channel.empty()
of
The of
channel emits whatever you give it. This is a handy generic factory to
use when you aren't sure what you'll be emitting from the channel.
example_ch = Channel.of(0,1,2,3)
fromPath
The fromPath
channel emits one or more file paths. It doesn't matter if it s a
single file, a group of files, a directory, or a group of directories. As long
as its a valid file path the channel will emit it.
example_ch = Channel.fromPath('/example/path')
fromFilePairs
The fromFilePairs
channel emits the file pair that matches a glob pattern. A
glob pattern is a groovy regex pattern. So, as an example, assume you are
processing a group of chest x-rays with Left and Right images for each subject.
You can populate a channel with the matching regex pattern and pass them as a
pair to a process.
example_ch = Channel.fromFilePairs(*_{L/R}.xray)
fromSRA
The fromSRA
channel is particularly useful to genomics workflows. It accepts
an SRR number and it will go and query the SRA database for that SRR. It will
then populate the channel with the fastq files related to that SRR number.
example_ch = Channel.fromSRA(SRR1254235)
watchPath
The watchPath
channel watches a folder for one or more files matching a glob
pattern. It essentially waits for a file to be generated and then populates the
channel with that file path.
example_ch = Channel.watchPath('/example/file.fasta)
fromList
The fromList
channel emits the values provided as a list of elements.
example_ch = Channel.fromList('Go','Tigers','Win', 'That', 'Tournament')
value
The value
channel is the recommended way to generate a value channel. This can
be anything, a file path to a singular value.
example_ch = Channel.value()
Any channel consisting of a SINGLE value, path, or object is treated as a value channel.
Operators
Channels are not immutable objects. They can be manipulated using what are called operators. There are several operators covering several key areas of manipulation. This includes filtering, reduction, parsing text, combining channels, forking, and simple mathematics operations. In this workshop we will demonstrate several of these but by no means all. You are encouraged to see the documentation.
Note that nextflow utilizes something called implicit parallelism. That is if a process or channel operator has everything it needs to run it will run in parallel as resources allow. As such when you run these commands the order of the output will vary from what is shown below.
branch
: Branch channels based off of a glob pattern.
Channel.of(500,700,"tigers","this",1001,1235)
.branch{
nan : it == "this"
nan2: it == "tigers"
num : it < 1000
num2: it > 1000 }
result.nan.view {"$it is not a number"}
result.num.view {"$it is a number less than 1000"}
result.nan2.view{"$it is not a number"}
result.num2.view{"$it is a number Greater than 1000"}
output:
500 is a number less than 1000
1001 is a number Greater than 1000
this is not a number
1235 is a number Greater than 1000
700 is a number less than 1000
tigers is not a number
sum
and set
: Sum the entire channel and set the value to a channel.
Channel.of(132508,2359730,3285908392085,320,0.333,0.3532432,23430.3)
.sum().set{result2}
result2.view {"The sum of the channel is $it"}
output:
The sum of the channel is 3285910908073.9862432
Max
: Identify the maximum value of the channel.
Channel.of(132508,2359730,3285908392085,320,0.333,0.3532432,23430.3)
.max()
.view{"Max value : $it"}
output:
Max value : 3285908392085
join
: Creates a channel that joins together the items emitted by two other
channels for which exists a matching key. This key is determined dynamically.
Also note that for the entries that no pair exists they are not reported in the
view statement.
primary_ch = Channel.of(['Tigers',1],['Clemson',1889],['CCIT',1000],['CAFLS',13245])
secondary_ch = Channel.of(['Tigers',5],['Clemson',2018],['CAFLS',2018])
primary_ch.join(secondary_ch).view()
output:
[Tigers, 1, 5]
[Clemson, 1889, 2018]
[CAFLS, 1893, 2018]
distinct
: removes consecutive duplicated items from a channel, so that each
emitted item is different from the preceding one.
repeat_ch = Channel.of('Go','Go','Go','Tigers','Tigers','Tigers','win!','win!','win!','Go','Go','Tigers','Tigers')
repeat_ch.distinct().view()
output:
Go
Tigers
win!
Go
Tigers
randomSample
: is a pseudo-random sampler. It will randomly sample a list once
and then keep that random sample until the list changes. Note in this example
the ,234 is the seed. This is critical to set for reproducibility.
Channel.of(1..1000)
.randomSample(7, 234)
.view()
output:
1
5
100
254
678
999
672
Processes
Processes are the functions of nextflow. They consist of 4 aspects, the process declaration, input channel declarations, output channel declarations, and a script block. The script block must always be at the end of the process declaration.
For each discreet data point entering into a process Nextflow spins up what is called a task. Each task is an instance of the process and run in parallel to one another. These tasks are isolated and do not share write states enabling parallel I/O processes. Their only means of communication is channels. Another thing to keep in mind about processes is that they do not share configurations. What I mean by that is if you load any modules in one process it will not persist to the next process. You either have to call such things explicitly in each process declaration or call it in a configuration file.
process DoSomething {
input:
path x
output:
path y
script:
"""
Rscript DoSomething.R $x $y
"""
}
Executors
Defines the environment in which the computation is executed. These are abstractions, or simplifications, that enable greater portability between different HPC systems. In the event you develop a workflow on AWS batch and want to run it in a system utilizing slurm you should only have to modify the configuration file to port it between the two.
These abstractions are powerful. They not only negotiate with the scheduler on your behalf, they also generate the batch scripts for the additional jobs it wants to spin up for you.
If you define an executor each task will be spun up as a new job. So please be cognizant of the number of jobs you spin up and how often using the correct executor configuration.
executor {
executor = slurm
queueSize = 4
pollInterval = '30 sec'
}
Config files
Defines the configuration of the workflow. These configuration files consist of parameter declarations, container calls, executor definitions, resource requests, module loads, ect.
params.str = "Hello world!"
process{
executor = slurm
cpus = 4
mem = 10.GB
queueSize = 4
moduel = ['r/4.2.2', 'anacodna3']
}
executor {
queueSize = 4
pollInterval = '30 sec'
}
Nextflow File System and Resume
Now to take a step aside before discussing workflow / subworkflow declarations
lets discuss the working file system that Nextflow uses. For each task generated
, Nextflow generates a unique hash. This hash is then used to generate a
subdirectory in a directory called "work". In this subdirectory it will store
any and all files related to that task. This includes any and all results your
analysis generates. You must use publishdir
command in your process
declarations to ensure that your results are written to another directory. See
the figure below for a visual breakdown of a work directory.
work/
├── 38
│ └── 7326dfe49eea2ca192015b4752cb54
│ └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
├── 53
│ └── a903dd35193d06feab1cd2c226adc3
│ └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
├── b9
│ └── 4277ef702247713182d8a632dc572b
│ └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
├── dc
│ └── 2a839d95100393c2f1c0c462115e73
│ └── moving_picture_tutorial -> /home/zigerst/scripts/nextflow_tests/work/f5/9ae0c094ed30ea3f83b632a6dc9c9b/moving_picture_tutorial
└── f5
└── 9ae0c094ed30ea3f83b632a6dc9c9b
├── moving_picture_tutorial
│ ├── aligned-rep-seqs.qza
│ ├── demux-details.qza
│ ├── demux.qza
│ ├── emp-single-end-sequences
│ │ ├── barcodes.fastq.gz
│ │ └── sequences.fastq.gz
│ ├── emp-single-end-sequences.qza
│ ├── masked-aligned-rep-seqs.qza
│ ├── rep-seqs.qza
│ ├── rooted-tree.qza
│ ├── sample_metadata.tsv
│ ├── stats.qza
│ ├── table.qza
│ └── unrooted-tree.qza
└── test.sh -> /home/zigerst/scripts/nextflow_tests/scripts/test.sh
Now this may seem like a major draw back but its done for a reason. This hash
system enables Nextflow's resume functionality. In the event the workflow is
interrupted and restarted it will check to see what was the latest recorded hash
that CORRESPONDS to the latest hash directory in work. It will then resume
the workflow from there. So its important to preserve the .nexflow/cache
as
well as the work dir when you are working.
Once you finish your work and verified you have your results saved to a permanent location, please make sure to erase this work directory! They eat up space quickly when left unattended.
Workflows, Modularity and Subworkflows
The workflow declaration is what dictates the order in which the defined processes are executed. Nextflow utilizes 'lazy' execution meaning it does not generate a DAG from the declaration. Rather it parses the workflow logic and runs what can be run. This is why your channel set up is so critical in nextflow workflows. Depending on which channels are populated and what processes expect them will dictate what will be executed first.
Workflows can call other elements from other .nf files. This modularity allows you to reuse developed processes from other workflows without having to rewrite them. This also enables binary usage within a workflow. Additionally, you can call other workflows within a workflow. Allowing you to reuse entire workflows for novel situations.
# Workflow declarations
workflow letters {
splitLetters | flatten | capitalizeLetters | view ({it.trim})
}
workflow qiime {
prepare_data(params.init)
init_qiime(prepare_data.out)
demux(init_qiime.out)
denoise(demux.out)
phylogeny(denoise.out)
}
# Calling modules
include {Someprocess} from 'somefile.nf'
# Calling Subworkflows
workflow {
letters()
qiime()
}
Running Nextflow on Palmetto2
module load nextflow
module load openjdk/11.0.20.1_1
nextflow run <file.nf>
Note Explaining container functionality is beyond the scope of this workshop.