Skip to content

Commit

Permalink
Fix bigwigaverageoverbed by splitting file by size (and adding releva…
Browse files Browse the repository at this point in the history
…nt functionality in a FileView and new function), fix bedgraphtobigwig by making index_chroms check if not sorted, and cleanup bench a bit.
  • Loading branch information
jackh726 committed Nov 13, 2023
1 parent df157c9 commit 1fcfdac
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 115 deletions.
74 changes: 42 additions & 32 deletions bench/bench.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
import os
import re
import resource
import struct
import subprocess
import sys
import urllib.request

from monitorps import monitor

# To run (example): python3 bench/bench.py ./workdir/kent ./target/release

timeregex = re.compile(r'(\d.*)user (\d.*)system (\d.*)elapsed')
ucsctoolspath = None
bigtoolspath = None
reps = 3
reps = 1

def download(url, name):
filename = name.replace('.gz', '')
if not os.path.exists(f'./workdir/{filename}'):
print(f"Downloading {name}")
urllib.request.urlretrieve(url, f'./workdir/{name}')
if name.endswith(".gz"):
print(f"Ungzipping {name}")
subprocess.check_call(['gunzip', f'./workdir/{name}'])

def download_test_data():
print("Downloading test data")
if not os.path.exists("./workdir"):
os.makedirs("./workdir")
if not os.path.exists("./workdir/output"):
os.makedirs("./workdir/output")
def download(url, name):
filename = name.replace('.gz', '')
if not os.path.exists(f'./workdir/{filename}'):
print(f"Downloading {name}")
urllib.request.urlretrieve(url, f'./workdir/{name}')
if name.endswith(".gz"):
print(f"Ungzipping {name}")
subprocess.check_call(['gunzip', f'./workdir/{name}'])
os.makedirs("./workdir/output", exist_ok=True)

download('https://www.encodeproject.org/files/ENCFF937MNZ/@@download/ENCFF937MNZ.bigWig', 'ENCFF937MNZ.bigWig')
download('https://www.encodeproject.org/files/ENCFF447DHW/@@download/ENCFF447DHW.bigWig', 'ENCFF447DHW.bigWig')
Expand All @@ -35,8 +33,16 @@ def download(url, name):
download('https://www.encodeproject.org/files/ENCFF646AZP/@@download/ENCFF646AZP.bed.gz', 'ENCFF646AZP.bed.gz')
download('https://www.encodeproject.org/files/ENCFF076CIO/@@download/ENCFF076CIO.bed.gz', 'ENCFF076CIO.bed.gz')
download('https://raw.githubusercontent.com/ENCODE-DCC/encValData/562ab5bf03deff9bb5340991fd5c844162b82914/GRCh38/GRCh38_EBV.chrom.sizes', 'hg38.chrom.sizes')
print("Done downloading test data")

def download_kent_tools():
print("Downloading kent tools")
os.makedirs('./workdir/kent', exist_ok=True)

download('http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bedToBigBed', 'kent/bedToBigBed')
download('http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bedGraphToBigWig', 'kent/bedGraphToBigWig')
download('http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bigWigAverageOverBed', 'kent/bigWigAverageOverBed')
download('http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bigWigMerge', 'kent/bigWigMerge')
download('http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bigWigToBedGraph', 'kent/bigWigToBedGraph')

def time(exeargs_all, bench, program, rep):
total_seconds = 0
Expand All @@ -45,9 +51,11 @@ def time(exeargs_all, bench, program, rep):
print(exeargs)
logfile = f'./workdir/output/{bench}_{program}_{rep}_{i}.log'
with subprocess.Popen(exeargs, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) as process:
monitor(process.pid, logfile, 0.1)
monitor(process.pid, logfile, 0.25)
out, err = process.communicate()
out = out.decode('utf-8')
err = err.decode('utf-8')
print(out, err)
m = timeregex.search(err)
elapsed_split = m.group(3).split(':')
# Seconds
Expand Down Expand Up @@ -95,23 +103,30 @@ def bigwigaverageoverbed_long(comp):
if not os.path.exists('./workdir/ENCFF076CIO_cut_sample.bed'):
process = subprocess.check_call(f'{bigtoolspath}/bigtools chromintersect -a ./workdir/ENCFF076CIO.bed -b ./workdir/ENCFF937MNZ.bigWig -o -' + ' | cut -f1-3 | awk -v OFS=\'\\t\' \'{print $1,$2,$3, NR}\' | shuf --random-source=./workdir/ENCFF076CIO.bed | head -1000000 | sort -k1,1 -k2,2n > ./workdir/ENCFF076CIO_cut_sample.bed', shell=True)
ucsc = [['{}/bigWigAverageOverBed'.format(ucsctoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF076CIO_cut_sample.bed', './workdir/test_out_ucsc.bed']]
bigtools_mt = [['{}/bigwigaverageoverbed'.format(bigtoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF076CIO_cut_sample.bed', './workdir/test_out_bigtools.bed', '-t 4']]
bigtools_st = [['{}/bigwigaverageoverbed'.format(bigtoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF076CIO_cut_sample.bed', './workdir/test_out_bigtools.bed']]
bigtools_mt = [['{}/bigwigaverageoverbed'.format(bigtoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF076CIO_cut_sample.bed', './workdir/test_out_bigtools.bed', '-t 6']]
bigtools_st = [['{}/bigwigaverageoverbed'.format(bigtoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF076CIO_cut_sample.bed', './workdir/test_out_bigtools.bed', '-t 1']]
compare(comp, 'bigwigaverageoverbed_long', ucsc, bigtools_mt, bigtools_st)

def bigwigmerge(comp):
global ucsctoolspath
global bigtoolspath
ucsc = [['{}/bigWigMerge'.format(ucsctoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF447DHW.bigWig', './workdir/test_out_ucsc.bedGraph']]
bigtools = [['{}/bigwigmerge'.format(bigtoolspath), './workdir/test_out_bigtools.bedGraph', '-b ./workdir/ENCFF937MNZ.bigWig', '-b ./workdir/ENCFF447DHW.bigWig']]
compare(comp, 'bigwigmerge_bedgraph', ucsc, bigtools)
def bigwigmerge_bigwig(comp):
ucsc = [
['{}/bigWigMerge'.format(ucsctoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF447DHW.bigWig', './workdir/test_out_ucsc.bedGraph'],
['{}/bedGraphToBigWig'.format(ucsctoolspath), './workdir/test_out_ucsc.bedGraph', './workdir/hg38.chrom.sizes', './workdir/test_out_ucsc.bigWig']
]
bigtools = [['{}/bigwigmerge'.format(bigtoolspath), './workdir/test_out_bigtools.bigWig', '-b ./workdir/ENCFF937MNZ.bigWig', '-b ./workdir/ENCFF447DHW.bigWig']]
bigtools_st = [['{}/bigwigmerge'.format(bigtoolspath), './workdir/test_out_bigtools.bigWig', '-b ./workdir/ENCFF937MNZ.bigWig', '-b ./workdir/ENCFF447DHW.bigWig', '-t 1']]
compare(comp, 'bigwigmerge_bigwig', ucsc, bigtools, bigtools_st)

def bigwigmerge_bedgraph(comp):
global ucsctoolspath
global bigtoolspath
ucsc = [['{}/bigWigMerge'.format(ucsctoolspath), './workdir/ENCFF937MNZ.bigWig', './workdir/ENCFF447DHW.bigWig', './workdir/test_out_ucsc.bedGraph']]
bigtools = [['{}/bigwigmerge'.format(bigtoolspath), './workdir/test_out_bigtools.bedGraph', '-b ./workdir/ENCFF937MNZ.bigWig', '-b ./workdir/ENCFF447DHW.bigWig']]
compare(comp, 'bigwigmerge_bedgraph', ucsc, bigtools)

def bigwigmerge(comp):
bigwigmerge_bedgraph(comp)
#bigwigmerge_bigwig(comp)


def bedgraphtobigwig(comp):
global ucsctoolspath
Expand Down Expand Up @@ -156,6 +171,8 @@ def main():
bigtoolspath = sys.argv[2] if len(sys.argv) > 2 else "/usr/local/bin"
bench = sys.argv[3] if len(sys.argv) > 3 else None
download_test_data()
download_kent_tools()
print("Running benchmarks.")
with open("./workdir/output/comparison.txt", "w") as comp:
if bench is None or bench == 'bigwigaverageoverbed':
bigwigaverageoverbed(comp)
Expand All @@ -172,10 +189,3 @@ def main():

if __name__ == '__main__':
main()

# To run (example): python3 bench/bench.py ./workdir/kent ./target/release
# wget http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bedToBigBed
# wget http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bedGraphToBigWig
# wget http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bigWigAverageOverBed
# wget http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bigWigMerge
# wget http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/bigWigToBedGraph
1 change: 0 additions & 1 deletion bench/monitorps.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def main():
start_and_monitor(args.command, args.log, args.interval)

def start_and_monitor(command, logfile, interval):
command = args.command
print("Starting up command '{0}' and attaching to process".format(command))
sprocess = subprocess.Popen(command, shell=True)
pid = sprocess.pid
Expand Down
2 changes: 1 addition & 1 deletion src/bbi/bedchromdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ mod tests {
dir.push("multi_chrom.bedGraph");

let chrom_indices: Vec<(u64, String)> =
crate::bed::indexer::index_chroms(File::open(dir.clone())?)?;
crate::bed::indexer::index_chroms(File::open(dir.clone())?)?.unwrap();

let mut chsi = BedParserParallelStreamingIterator::new(
chrom_indices,
Expand Down
5 changes: 2 additions & 3 deletions src/bed/bedparser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
//! The second layer of abstraction (`BedParser`) manages the state information for when values switch
//! from one chromosome to another. The is important because bigwig/bigbed writing is "chunked" by chromosome.

use std::fs::File;
use std::io::{self, BufRead, BufReader, Read};
use std::sync::Arc;

Expand Down Expand Up @@ -219,8 +218,8 @@ impl<S: StreamingBedValues> BedParser<S> {
}
}

impl BedParser<BedFileStream<BedEntry, BufReader<File>>> {
pub fn from_bed_file(file: File) -> Self {
impl<R: Read> BedParser<BedFileStream<BedEntry, BufReader<R>>> {
pub fn from_bed_file(file: R) -> Self {
BedParser::new(BedFileStream {
bed: StreamingLineReader::new(BufReader::new(file)),
parse: parse_bed,
Expand Down
22 changes: 15 additions & 7 deletions src/bed/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::utils::indexlist::{Index, IndexList};
use crate::utils::tell::Tell;

/// Returns a Vec of offsets into a bed file, and the chromosome starting at each offset.
pub fn index_chroms(file: File) -> io::Result<Vec<(u64, String)>> {
pub fn index_chroms(file: File) -> io::Result<Option<Vec<(u64, String)>>> {
let mut file = BufReader::new(file);

let mut line = String::new();
Expand Down Expand Up @@ -73,9 +73,10 @@ pub fn index_chroms(file: File) -> io::Result<Vec<(u64, String)>> {
let tell = file.tell()?;
file.read_line(line)?;
let chrom = parse_line(&*line)?;
if chrom.is_none() {
return Ok(());
}
let chrom = match chrom {
Some(chrom) => chrom,
None => return Ok(()),
};

// There are three options:
// 1) The chrom is the same as the previous one. We need to index
Expand All @@ -96,7 +97,7 @@ pub fn index_chroms(file: File) -> io::Result<Vec<(u64, String)>> {
// to continue to index between the previous and current as well as
// between the current and next.

let curr = chroms.insert_after(prev, (tell, chrom.unwrap())).unwrap();
let curr = chroms.insert_after(prev, (tell, chrom)).unwrap();

if chroms[curr].1 != chroms[prev].1 && tell < next_tell {
do_index(file_size, file, chroms, line, prev, Some(curr), limit - 1)?;
Expand All @@ -121,8 +122,15 @@ pub fn index_chroms(file: File) -> io::Result<Vec<(u64, String)>> {

let mut chroms: Vec<_> = chroms.into_iter().collect();
chroms.dedup_by_key(|index| index.1.clone());
let mut deduped_chroms = chroms.clone();
deduped_chroms.sort();
deduped_chroms.dedup_by_key(|index| index.1.clone());
if chroms.len() != deduped_chroms.len() {
dbg!(&chroms, &deduped_chroms);
return Ok(None);
}

Ok(chroms)
Ok(Some(chroms))
}

#[cfg(test)]
Expand Down Expand Up @@ -164,7 +172,7 @@ mod tests {
}

let f = File::open(dir)?;
let indexed_chroms = index_chroms(f)?;
let indexed_chroms = index_chroms(f)?.unwrap();
assert_eq!(chroms, indexed_chroms);

Ok(())
Expand Down
33 changes: 24 additions & 9 deletions src/bin/bedgraphtobigwig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct Cli {
/// The output bigwig path
output: String,

/// Set whether to read and convert the bedGraph in parallel.
/// Set whether to read and convert the bedGraph in parallel. Requires that the bedGraph is sorted.
/// Can take `auto` (default), `yes`, `no`. Ignored when input is stdin or when nthreads is `1`.
#[arg(short = 'p', long)]
#[arg(default_value = "auto")]
Expand Down Expand Up @@ -109,21 +109,35 @@ fn main() -> Result<(), Box<dyn Error>> {
outb.write_singlethreaded(chrom_map, chsi, pool)?;
} else {
let infile = File::open(&bedgraphpath)?;
let parallel = match (nthreads, matches.parallel.as_ref()) {
(1, _) | (_, "auto") => infile.metadata()?.len() >= 200_000_000,
(_, "yes") => true,
(_, "no") => false,
let (parallel, parallel_required) = match (nthreads, matches.parallel.as_ref()) {
(1, _) | (_, "no") => (false, false),
(_, "auto") => (infile.metadata()?.len() >= 200_000_000, false),
(_, "yes") => (true, true),
(_, v) => {
eprintln!(
"Unexpected value for `parallel`: \"{}\". Defaulting to `auto`.",
v
);
true
(infile.metadata()?.len() >= 200_000_000, false)
}
};
if parallel {
let chrom_indices: Vec<(u64, String)> = index_chroms(infile)?;

let chrom_indices = match parallel {
false => None,
true => {
let index = index_chroms(infile)?;
match (index, parallel_required) {
(Some(index), _) => Some(index),
(None, true) => {
eprintln!(
"Parallel conversion requires a sorted bedGraph file. Cancelling.",
);
return Ok(());
}
(None, false) => None,
}
}
};
if let Some(chrom_indices) = chrom_indices {
if matches.single_pass {
let chsi = BedParserParallelStreamingIterator::new(
chrom_indices,
Expand All @@ -149,6 +163,7 @@ fn main() -> Result<(), Box<dyn Error>> {
)?;
}
} else {
let infile = File::open(&bedgraphpath)?;
if matches.single_pass {
let vals_iter = BedParser::from_bedgraph_file(infile);

Expand Down
Loading

0 comments on commit 1fcfdac

Please sign in to comment.