The aim of this test is to get a count of each type of variant/genotypes in a VCF file using Apache Hadoop and the java library for NGS htsjdk. My source code is available at: https://github.com/lindenb/hadoop-sandbox/blob/master/src/main/java/com/github/lindenb/hadoop/Test.java.
First, and this is my main problem, I needed to create a class 'VcfRow' that would contains the whole data about a variant. As I need to keep the information about all the semantics in the VCF header, each record contains the whole VCF header (!). I asked SO if there was an elegant way to save the header in the hadoop workflow but it currently seems that there is no such solution (http://stackoverflow.com/questions/30052859/hadoop-mapreduce-handling-a-text-file-with-a-header). This class
VcfRow must implement WritableComparable to be serialized by the hadoop pipeline. It's awfully sloooooow since we need to parse a htsjdk.variant.vcf.VCFHeader and a htsjdk.variant.vcf.VCFCodec for each new variant.
public static class VcfRow
implements WritableComparable<VcfRow>
{
private List<String> headerLines;
private String line;
private VariantContext ctx=null;
private VCFHeader header =null;
private VCFCodec codec=new VCFCodec();
public VcfRow()
{
this.headerLines = Collections.emptyList();
this.line="";
}
public VcfRow(List<String> headerLines,String line)
{
this.headerLines=headerLines;
this.line=line;
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeInt(this.headerLines.size());
for(int i=0;i< this.headerLines.size();++i)
{
out.writeUTF(this.headerLines.get(i));
}
byte array[]=line.getBytes();
out.writeInt(array.length);
out.write(array);
}
@Override
public void readFields(DataInput in) throws IOException
{
int n= in.readInt();
this.headerLines=new ArrayList<String>(n);
for(int i=0;i<n;++i) this.headerLines.add(in.readUTF());
n = in.readInt();
byte array[]=new byte[n];
in.readFully(array);
this.line=new String(array);
this.codec=new VCFCodec();
this.ctx=null;
this.header=null;
}
public VCFHeader getHeader()
{
if(this.header==null)
{
this.header = (VCFHeader)this.codec.readActualHeader(new MyLineIterator());
}
return this.header;
}
public VariantContext getVariantContext()
{
if(this.ctx==null)
{
if(this.header==null) getHeader();//force decode header
this.ctx=this.codec.decode(this.line);
}
return this.ctx;
}
@Override
public int compareTo(VcfRow o)
{
int i = this.getVariantContext().getContig().compareTo(o.getVariantContext().getContig());
if(i!=0) return i;
i = this.getVariantContext().getStart() - o.getVariantContext().getStart();
if(i!=0) return i;
i = this.getVariantContext().getReference().compareTo( o.getVariantContext().getReference());
if(i!=0) return i;
return this.line.compareTo(o.line);
}
private class MyLineIterator
extends AbstractIterator<String>
implements LineIterator
{
int index=0;
@Override
protected String advance()
{
if(index>= headerLines.size()) return null;
return headerLines.get(index++);
}
}
}
Then a special
InputFormat is created for the VCF format. As we need to keep a trace of the Header, this file declares
`isSplitable==false`
. The class
VcfInputFormat creates an instance of
RecordReader reading the whole VCF header the first time it is invoked with the method
`initialize`
. This 'VcfRecordReader' creates a new
VcfRow for each line.
public static class VcfInputFormat extends FileInputFormat<LongWritable, VcfRow>
{
private List<String> headerLines=new ArrayList<String>();
@Override
public RecordReader<LongWritable, VcfRow> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new VcfRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
//LineRecordReader
private class VcfRecordReader extends RecordReader<LongWritable, VcfRow>
{
private LineRecordReader delegate=new LineRecordReader();
public VcfRecordReader() throws IOException
{
}
@Override
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
delegate.initialize(genericSplit, context);
while( delegate.nextKeyValue())
{
String row = delegate.getCurrentValue().toString();
if(!row.startsWith("#")) throw new IOException("Bad VCF header");
headerLines.add(row);
if(row.startsWith("#CHROM")) break;
}
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return delegate.getCurrentKey();
}
@Override
public VcfRow getCurrentValue() throws IOException,
InterruptedException {
Text row = this.delegate.getCurrentValue();
return new VcfRow(headerLines,row.toString());
}
@Override
public float getProgress() throws IOException, InterruptedException {
return this.delegate.getProgress();
}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
return this.delegate.nextKeyValue();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
}
The hadoop mapper uses the information of each VCFrow and produce a count of each category:
public static class VariantMapper
extends Mapper<LongWritable, VcfRow, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, VcfRow vcfRow, Context context ) throws IOException, InterruptedException {
VariantContext ctx = vcfRow.getVariantContext();
if( ctx.isIndel())
{
word.set("ctx_indel");
context.write(word, one);
}
if( ctx.isBiallelic())
{
word.set("ctx_biallelic");
context.write(word, one);
}
if( ctx.isSNP())
{
word.set("ctx_snp");
context.write(word, one);
}
if( ctx.hasID())
{
word.set("ctx_id");
context.write(word, one);
}
word.set("ctx_total");
context.write(word, one);
for(String sample: vcfRow.getHeader().getSampleNamesInOrder())
{
Genotype g =vcfRow.getVariantContext().getGenotype(sample);
word.set(sample+" "+ctx.getType()+" "+g.getType().name());
context.write(word, one);
}
}
}
The Reducer computes the sum of each category:
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
and here is the main program:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "snp count");
job.setJarByClass(Test.class);
job.setMapperClass(VariantMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path inputPath=new Path(args[0]);
job.setInputFormatClass(VcfInputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Download, compile, Run:
lindenb@hardyweinberg:~/src/hadoop-sandbox$ make -Bn
rm -rf hadoop-2.7.0
curl -L -o hadoop-2.7.0.tar.gz "http://apache.spinellicreations.com/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz"
tar xvfz hadoop-2.7.0.tar.gz
rm hadoop-2.7.0.tar.gz
touch -c hadoop-2.7.0/bin/hadoop
rm -rf htsjdk-1.130
curl -L -o 1.130.tar.gz "https://github.com/samtools/htsjdk/archive/1.130.tar.gz"
tar xvfz 1.130.tar.gz
rm 1.130.tar.gz
(cd htsjdk-1.130 && ant )
mkdir -p tmp dist
javac -d tmp -cp hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0.jar:hadoop-2.7.0/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/hadoop-annotations-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar -sourcepath src/main/java src/main/java/com/github/lindenb/hadoop/Test.java
jar cvf dist/test01.jar -C tmp .
rm -rf tmp
mkdir -p input
curl -o input/CEU.exon.2010_09.genotypes.vcf.gz "ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/pilot_data/paper_data_sets/a_map_of_human_variation/exon/snps/CEU.exon.2010_09.genotypes.vcf.gz"
gunzip -f input/CEU.exon.2010_09.genotypes.vcf.gz
rm -rf output
HADOOP_CLASSPATH=htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar hadoop-2.7.0/bin/hadoop jar dist/test01.jar com.github.lindenb.hadoop.Test \
input/CEU.exon.2010_09.genotypes.vcf output
cat output/*
Here is the output of the last command:
15/05/05 17:18:34 INFO input.FileInputFormat: Total input paths to process : 1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: number of splits:1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/05/05 17:18:34 INFO mapreduce.Job: Running job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Waiting for map tasks
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Starting task: attempt_local1186897577_0001_m_000000_0
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
15/05/05 17:18:34 INFO mapred.MapTask: Processing split: file:/home/lindenb/src/hadoop-sandbox/input/CEU.exon.2010_09.genotypes.vcf:0+2530564
15/05/05 17:18:34 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/05/05 17:18:34 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/05/05 17:18:34 INFO mapred.MapTask: soft limit at 83886080
15/05/05 17:18:34 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/05/05 17:18:34 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/05/05 17:18:34 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/05/05 17:18:35 INFO mapreduce.Job: Job job_local1186897577_0001 running in uber mode : false
15/05/05 17:18:35 INFO mapreduce.Job: map 0% reduce 0%
15/05/05 17:18:36 INFO mapred.LocalJobRunner:
15/05/05 17:18:36 INFO mapred.MapTask: Starting flush of map output
15/05/05 17:18:36 INFO mapred.MapTask: Spilling map output
15/05/05 17:18:36 INFO mapred.MapTask: bufstart = 0; bufend = 7563699; bufvoid = 104857600
15/05/05 17:18:36 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 24902536(99610144); length = 1311861/6553600
15/05/05 17:18:38 INFO mapred.MapTask: Finished spill 0
15/05/05 17:18:38 INFO mapred.Task: Task:attempt_local1186897577_0001_m_000000_0 is done. And is in the process of committing
(...)
NA12843 SNP HOM_REF 2515
NA12843 SNP HOM_VAR 242
NA12843 SNP NO_CALL 293
NA12872 SNP HET 394
NA12872 SNP HOM_REF 2282
NA12872 SNP HOM_VAR 188
NA12872 SNP NO_CALL 625
NA12873 SNP HET 336
NA12873 SNP HOM_REF 2253
NA12873 SNP HOM_VAR 184
NA12873 SNP NO_CALL 716
NA12874 SNP HET 357
NA12874 SNP HOM_REF 2395
NA12874 SNP HOM_VAR 229
NA12874 SNP NO_CALL 508
NA12878 SNP HET 557
NA12878 SNP HOM_REF 2631
NA12878 SNP HOM_VAR 285
NA12878 SNP NO_CALL 16
NA12889 SNP HET 287
NA12889 SNP HOM_REF 2110
NA12889 SNP HOM_VAR 112
NA12889 SNP NO_CALL 980
NA12890 SNP HET 596
NA12890 SNP HOM_REF 2587
NA12890 SNP HOM_VAR 251
NA12890 SNP NO_CALL 55
NA12891 SNP HET 609
NA12891 SNP HOM_REF 2591
NA12891 SNP HOM_VAR 251
NA12891 SNP NO_CALL 38
NA12892 SNP HET 585
NA12892 SNP HOM_REF 2609
NA12892 SNP HOM_VAR 236
NA12892 SNP NO_CALL 59
ctx_biallelic 3489
ctx_id 3489
ctx_snp 3489
ctx_total 3489
that's it,
Pierre