Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an optimisation to spare not required shuffle of unpaired reads #2282

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class TransformFragments(protected val args: TransformFragmentsArgs) extends BDG

val rdd = if (args.loadAsAlignments) {
sc.loadAlignments(args.inputPath)
.toFragments
.toFragments()
} else {
sc.loadFragments(args.inputPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3680,7 +3680,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
.querynameSortedToFragments
} else {
info(s"Loading $pathName as BAM/CRAM/SAM and converting to Fragments.")
loadBam(pathName, stringency).toFragments
loadBam(pathName, stringency).toFragments()
}
} else if (isInterleavedFastqExt(trimmedPathName)) {
info(s"Loading $pathName as interleaved FASTQ and converting to Fragments.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,17 @@ sealed abstract class AlignmentDataset extends AvroReadGroupGenomicDataset[Align
/**
* Convert this set of reads into fragments.
*
* @param mergePairedReads Merge paired reads into a fragment. Can be set to false when only unpaired
* reads are in the AlignmentDataset. Defaults to true.
* @return Returns a FragmentDataset where all reads have been grouped together by
* the original sequence fragment they come from.
*/
def toFragments(): FragmentDataset = {
FragmentDataset(groupReadsByFragment().map(_.toFragment),
def toFragments(mergePairedReads: Boolean = true): FragmentDataset = {
val fragmentRDD = if (mergePairedReads) groupReadsByFragment().map(_.toFragment)
else
rdd.map(SingleReadBucket.buildFragmentFromSingleAlignment)

FragmentDataset(fragmentRDD,
sequences,
readGroups,
processingSteps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ private[read] object SingleReadBucket {
def apply(fragment: Fragment): SingleReadBucket = {
fromGroupedReads(fragment.getAlignments.toIterable)
}

def buildFragmentFromSingleAlignment(alignment: Alignment): Fragment = {
buildFragmentFromAlignments(List(alignment), alignment.getReadName, Option(alignment.getReadGroupId), Option(alignment.getInsertSize).map(_.toInt))
}

def buildFragmentFromAlignments(reads: Seq[Alignment], readName: String, readGroupId: Option[String], insertSize: Option[Int]): Fragment = {

val builder = Fragment.newBuilder()
.setName(readName)
.setAlignments(seqAsJavaList(reads))

// is an insert size defined for this fragment?
insertSize.foreach(is => {
builder.setInsertSize(is)
})

// set read group name, if known
readGroupId.foreach(n => builder.setReadGroupId(n))

builder.build()
}
}

/**
Expand Down Expand Up @@ -139,24 +160,17 @@ private[adam] case class SingleReadBucket(
// want to pay the cost exactly once
val unionReads = allReads

// start building fragment
val builder = Fragment.newBuilder()
.setName(unionReads.head.getReadName)
.setAlignments(seqAsJavaList(allReads.toSeq))
val firstRead = unionReads.head

// is an insert size defined for this fragment?
primaryMapped.headOption
.foreach(r => {
Option(r.getInsertSize).foreach(is => {
builder.setInsertSize(is.toInt)
})
})
val insertSize = primaryMapped.headOption match {
case Some(alignment) => Option(alignment.getInsertSize).map(_.toInt)
case None => None
}

// set read group name, if known
Option(unionReads.head.getReadGroupId)
.foreach(n => builder.setReadGroupId(n))
SingleReadBucket.buildFragmentFromAlignments(unionReads.toSeq,
firstRead.getReadName, Option(firstRead.getReadGroupId),
insertSize)

builder.build()
}
}

Expand Down
592 changes: 592 additions & 0 deletions adam-core/src/test/resources/unpaired.sam

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ class FragmentDatasetSuite extends ADAMFunSuite {
}

sparkTest("union two genomic datasets of fragments together") {
val reads1 = sc.loadAlignments(testFile("bqsr1.sam")).toFragments
val reads2 = sc.loadAlignments(testFile("small.sam")).toFragments
val reads1 = sc.loadAlignments(testFile("bqsr1.sam")).toFragments()
val reads2 = sc.loadAlignments(testFile("small.sam")).toFragments()
val union = reads1.union(reads2)
assert(union.rdd.count === (reads1.rdd.count + reads2.rdd.count))
// all of the contigs small.sam has are in bqsr1.sam
Expand Down Expand Up @@ -636,4 +636,18 @@ class FragmentDatasetSuite extends ADAMFunSuite {

assert(convertedRdd.rdd.collect().toSet == convertedDataset.rdd.collect().toSet)
}

sparkTest("Compare unpaired FragmentDataset with optimisation to non-optimised") {

val alignmentDataset = sc.loadAlignments(testFile("unpaired.sam"))

val merged = alignmentDataset.toFragments()

val notMerged = alignmentDataset.toFragments(false)

assert(merged.rdd.count === notMerged.rdd.count)

assert(merged.toDF().except(notMerged.toDF()).count() === 0)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class MarkDuplicatesSuite extends ADAMFunSuite {

private def markDuplicateFragments(reads: Alignment*): Array[Alignment] = {
AlignmentDataset(sc.parallelize(reads), SequenceDictionary.empty, rgd, Seq.empty)
.toFragments
.toFragments()
.markDuplicates()
.toAlignments
.rdd
Expand Down
6 changes: 4 additions & 2 deletions adam-python/bdgenomics/adam/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,16 +870,18 @@ def _inferConversionFn(self, destClass):
return "org.bdgenomics.adam.api.java.AlignmentsTo%s" % self._destClassSuffix(destClass)


def toFragments(self):
def toFragments(self, mergePairedReads=True):
"""
Convert this set of reads into fragments.

:param mergePairedReads: Merge paired reads into a fragment. Can be set to false when only unpaired
reads are in the AlignmentDataset. Defaults to true.
:return: Returns a FragmentDataset where all reads have been grouped
together by the original sequence fragment they come from.
:rtype: bdgenomics.adam.rdd.FragmentDataset
"""

return FragmentDataset(self._jvmRdd.toFragments(), self.sc)
return FragmentDataset(self._jvmRdd.toFragments(mergePairedReads), self.sc)


def toCoverage(self, collapse = True):
Expand Down