java 8 streams "support functional-style operations on streams of elements, such as map-reduce transformations on collections". In this post, I will show how I've implemented a java.util.stream.Stream of VCF variants that counts the number of items in dbsnp.
This example uses the java htsjdk API for reading variants.
When using parallel streams, the main idea is to implement a java.util.Spliterator that will split the sequence dictionary (the genome) into a maximum of N (here N=5) parts. Each part will count the number of variants in 1/N genome in its own thread. As we're using an tribble indexed VCF, it's easy to start counting at a given position of the genome.
ContigPos
the class ContigPos defines a chromosome and a position in the whole genome.
class ContigPos {
/** contig/chromosome index in the dictionary */
final int tid;
/** contig/chromosome name */
final String contig;
/** position in the chromosome */
final int pos;
(...)
}
it contains a function to convert its' position to an index in the whole genome using the genome dictionary (htsjdk.samtools.SAMSequenceDictionary) .
long genomicIndex() {
long n=0L;
for(int i=0;i< this.tid;++i) {
n += dict.getSequence(i).getSequenceLength();
}
return n + this.pos;
}
VariantContextSpliterator
VariantContextSpliterator is the main class. It splits the VCF file into parts and implements Spliterator<VariantContext> .
public class VariantContextSpliterator
implements Closeable,Spliterator<VariantContext> {
(...)
VariantContextSpliterator contains the sequence dictionary and the path to the indexed VCF file
/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** genome dictionary */
private final SAMSequenceDictionary dict ;
Each VariantContextSpliterator has is own private VCFileReader and CloseableIterator. Both should be closed when the is no more variant to be read.
/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** current variant iterator */
private CloseableIterator<VariantContext> variantIter = null;
Each VariantContextSpliterator has a dedicated genomic region.
/* region start */
private ContigPos begin;
/** region end */
private ContigPos end ;
The very first VariantContextSpliterator will scan :
- from
begin = new ContigPos("chr1",0)
- to
end = new ContigPos("chrY",(size_of_chrY))
We don't want to open to many threads, so we're tracking the number of opened iterators in a AtomicInteger
AtomicInteger nsplits
VariantContextSpliterator.peek()
VariantContextSpliterator.peek() is a method peeking the next Variant in the genomic interval.
We open the VCFFileReader if it was never opened, the number of opened files is incremented.
/** VCF reader was never opened */
if( this.vcfFileReader == null ) {
/** open the VCF reader */
this.vcfFileReader = new VCFFileReader(this.vcfFile, true);
/** open a first iterator on the first chromosome */
this.variantIter = this.vcfFileReader.query(
this.begin.contig,
this.begin.pos,
this.dict.getSequence(this.begin.tid).getSequenceLength() /* whole chromosome size */
);
/** increase the number of opened streams */
this.nsplits.incrementAndGet();
}
while there is no more variant available on this chromosome , open the next chromosome for reading:
while(!this.variantIter.hasNext()) {
this.variantIter.close();
this.variantIter = null;
if(this.begin.tid == this.end.tid) /* this is the last chromosome */
{
close();
return null;
}
else
{
this.begin = new ContigPos(this.begin.tid+1, 0);
this.variantIter = this.vcfFileReader.query(
this.begin.contig,
this.begin.pos,
this.dict.getSequence(this.begin.tid).getSequenceLength() /* whole chromosome size */
);
}
}
get the next variant, update 'begin' with this variant. We close the VCFfileReader if we have reached the end of the genomic window.
/* get the next variant */
final VariantContext ctx = this.variantIter.next();
/* update 'begin' */
this.begin= new ContigPos(ctx.getContig(), ctx.getStart());
/** close if the end of the genomic location was reached */
if((this.begin.tid > this.end.tid) ||
(this.begin.tid == this.end.tid && this.begin.pos >= this.end.pos) ) {
close();
return null;
}
this._peeked = ctx;
return this._peeked;
VariantContextSpliterator.tryAdvance()
If a remaining variants exists, performs the given action on it, returning true; else returns false.
@Override
public boolean tryAdvance(Consumer<? super VariantContext> action) {
final VariantContext ctx = this.next();
if(ctx==null) {
close();
return false;
}
action.accept(ctx);
return true;
}
VariantContextSpliterator.trySplit()
trySplit returns a VariantContextSpliterator covering elements, that will, upon return from this method, not be covered by this VariantContextSpliterator. We can split if the remaining window size is greater than 1Mb and if the number of opened VCFReaderFile is lower than 10.
public Spliterator<VariantContext> trySplit() {
final VariantContext ctx = this.peek();
/** no more variant to read, can't split */
if(ctx==null) return null;
/** too many opened VCFFile reader, can't split */
if( this.nsplits.get()>5) return null;
long start = this.begin.genomicIndex();
long distance = this.end.genomicIndex() - start;
/** distance between begin and end is greater than 1Mb */
while(distance > 1E6 )
{
distance = distance/2;
/** middle genomic index */
final ContigPos mid = new ContigPos(start + distance);
/** create a new VariantContextSpliterator starting from mid and ending at this.end */
final VariantContextSpliterator next = new VariantContextSpliterator(this,mid,this.end);
if(next.peek()==null) {
next.close();
continue;
}
/* update this.end to 'mid' */
this.end= mid;
//System.err.println("split successful:after split "+toString()+" and next="+next);
return next;
}
return null;
}
Testing
to get a stream , we the static function java.util.stream.StreamSupport.stream
is called.
stream() Creates a new sequential or parallel Stream from a Spliterator. The spliterator is only traversed, split, or queried for estimated size after the terminal operation of the stream pipeline commences.
private Stream<VariantContext> stream(boolean parallel) {
return StreamSupport.stream(new VariantContextSpliterator(this.vcfFile), parallel);
}
We count the number of variants in dbSNP. We print the duration for stream(), parallelStream() and a standard iterator.
final File vcFile =new File(args[0]);
StreameableVcfFile test= new StreameableVcfFile(vcFile);
long start1 = System.currentTimeMillis();
System.out.println("count;"+test.parallelStream().count());
long end1 = System.currentTimeMillis();
System.out.println(" parallelstream: " + ((end1 - start1) / 1000));
long start2 = System.currentTimeMillis();
System.out.println("count;"+test.stream().count());
long end2 = System.currentTimeMillis();
System.out.println("stream : " + ((end2 - start2) / 1000));
long start3 = System.currentTimeMillis();
CloseableIterator<VariantContext> r= new VCFFileReader(vcFile).iterator();
int n=0;
while(r.hasNext()) { r.next(); ++n;}
r.close();
long end3 = System.currentTimeMillis();
System.out.println("count;"+n);
System.out.println("iter : " + ((end3 - start3) / 1000));
Output:
count: 61045456 snps
parallelstream: 80 seconds
count: 61045456 snps
stream : 365 seconds
count: 61045456 snps
iter : 355 seconds
That's it,
Pierre
Source code