Reduce Snakemake runtime by collapsing similar jobs into one multiprocessing job

Snakemake is a pipeline management system written in Python. It enables users to encode tasks as rules that describe how to create output files from input files. These rules are written in a Snakefile (similar to a makefile), and are chained together using input and output files (matched by file path patterns) to form a pipeline. For instance, a simple variant calling pipeline that starts with fastq files and ends with vcf files has an alignment rule, a sort/index bam rule, and a variant calling rule. The alignment rule shows how to take a fastq file and make an aligned bam file. The sort/index rule describes how to make a sorted and indexed bam file from the aligned bam file. The variant calling rule shows how to use the indexed bam file to make a vcf file containing variants.

I use Snakemake in every project because it allows me to maintain a record of every step in my analysis. It also has a number of convenient features. It will re-run rules based on upstream file changes. In the variant calling example, if I change the mapping of my bam file, Snakemake knows that my vcf file needs to be regenerated. Snakemake will show me the progress of my pipeline, as well as tell me what rules will be executed in a dry run of the pipeline. I consider Snakemake to be one of most valuable tools I use, but it does have some issues.

The biggest problem I have with Snakemake is scalability. Snakemake becomes unbearably slow when it manages over ten thousand files. It can easily take half an hour for Snakemake to do a dry run of some of my pipelines. That’s a long time to wait if I just need to process 10 new samples. Below I’ll describe a specific instance of a scaling issue I had, and how I solved it using Python’s multiprocessing module.

I start with 700 whole genome samples. The variant calls and associated data like read depth and alternative allele count are stored in sample specific HDF5 files. I have another file with thousands of positions that I want to query against each sample’s HDF5 variant set. I begin with one sample, and query its HDF5 variant file using Python’s PyTables. In my Snakefile, I have one rule that takes an HDF5 file and my query position file as input, and runs a script to make an output file containing the query results.

# Snakefile 1: queries all positions by sample
# too slow

rule query_hdf5:
    input:  'hdf5bySample/{sample}.hdf',
            'query_pos_file'
    output: 'hdf5QueryAll/{sample}'
    shell:  'python query_hdf5.py {wildcards.sample} {input} {output}'

rule all:
    input: 'hdf5QueryAll/sample_1'

I run the rule for one sample, and I kill the job after half an hour because it has only finished a few chromosomes. This rule needs to run chromosomes in parallel to finish one sample in a reasonable amount of time.

I split my query position file by chromosome, and alter my Snakefile rule so that it takes a chromosome specific query position file. When all 24 chromosomes run at once, I can do one sample in ten minutes.

# Snakefile 2: queries chrom positions by sample
# runs fast in parallel

rule query_hdf5:
    input:  'hdf5bySample/{sample}.hdf',
            'query_pos_file.{chrom}'
    output: 'hdf5QueryAll/{sample}.{chrom}'
    shell:  'python query_hdf5.py {wildcards.sample} {input} {output}'

rule all:
    input: expand('hdf5QueryAll/sample_1.{chrom}', chrom=CHROMS)

When I try to run several hundred samples at the same time, I hit Snakemake’s scalability issue. Instead of running the initial sample based rule 700 times in Snakefile 1, I’m running the sample/chromosome based rule 16,800 times in Snakefile 2. Snakemake is now so slow that I can grab lunch before it finishes enumerating all the files it needs to make. This will be horrible for me in a month when I come back with more samples, or need to add an additional rule to the pipeline.

I have a solution that maintains the ten minute per sample run time, and provides Snakemake with only 700 output files (one per sample) to manage. I maintain one Snakemake rule per sample. That rules takes the sample’s HDF5 variants as well as 24 files listing the query positions by chromosome, and outputs one file listing all query results.

# Snakefile 3: rules operate by sample
# but still split queries by chrom
# pull_hdf_mp.py handles multiprocessing by chrom

rule query_hdf5:
    input:   'hdf5bySample/{sample}.hdf',
             expand('query_pos_file.{chrom}', chrom=CHROMS)
    output:  'hdf5QueryAll/{sample}'
    threads: 24
    shell:   
             'python query_hdf5_mp.py {wildcards.sample} \
              {input} {output}'

rule all:
    input: expand('hdf5QueryAll/{sample}', sample=SAMPLES)

Assigning 24 threads to this rule allows me to grab 24 slots for multiprocessing when run on a cluster. Inside this rule, I use a script that creates one process per chromosome. This process loops through the chromosome’s query positions, and queries the HDF5 variants. When all processes are done, the results are collapsed into one file. The query script with multiprocessing is below.

"""
query_hdf5_mp.py
Python script to query all chroms in parallel
"""
import sys, tables, csv, os
from multiprocessing import Process, Pool

def query_hdf(table, chrom, start):
    return [x for x in 
            table.where('(chrom == %d) & (pos == %d)'
                         % (chrom, start))]
def process_row(sample, row, table, fout):
    start = int(row['start']) + 1
    chrom = row['chrom']
    result_ls = query_hdf(table, chrom, start)
    for r in result_ls:
        print('\t'.join(r), file=fout)

def query_file(params):
    query_file_name, out_file_name, hdf_file, sample = params

    # open connection to hdf5
    h5file = tables.open_file(hdf_file, mode="r")
    table = h5file.root.posCollection.posLs

    with open(query_file_name) as f, open(out_file_name, 'w') as fout:
        reader = csv.DictReader(f)
        for row in reader:
            process_row(sample, row, table, fout)

def main(sample, hdf_file, query_files, out_file):
    # create tmp files to store query results
    out_files = [afile + '.' + sample + '.tmp'
                 for afile in query_files]

    # run 24 processes at a time
    pool = Pool(processes=24)
    process_param_ls = []
    for query_pos_file, out_file in zip(query_files, out_files):
        args = [query_pos_file, out_file, hdf_file, sample]
        input_ls.append(args)

    # start processes
    result = pool.map(read_file, input_ls)

    # cat tmp query files
    os.system('touch ' + out_file)
    for gemini_file in files:
        os.system('cat {} >> {}'.format(gemini_file + '.' + sample
                                        + '.tmp',
                                        out_file))

if __name__ == "__main__":
    sample, hdf_file = sys.argv[1:3]
    gemini_files = sys.argv[3:-1]
    out_file = sys.argv[-1]
    main(sample, hdf_file, gemini_files, out_file)

I can run this on our cluster using 24 slots for each query job, and Snakemake uses almost no time determining what rules to run:

snakemake -s snakefile.py all --drmaa " -pe smp {threads}" \
-j 100 --latency-wait 40 --greediness 0.7

In conclusion, if you find yourself waiting on Snakemake dry runs and dependency checks, try limiting the files you produce by shoving some of the multiprocessing into a python script.

Advertisements

About samesense

Computational biologist at a children's hospital
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s