Playing with hadoop/mapreduce and htsjdk/VCF : my notebook.
The aim of this test is to get a count of each type of variant/genotypes in a VCF file using Apache Hadoop and the java library for NGS htsjdk. My source code is available at: https://github.com/lindenb/hadoop-sandbox/blob/master/src/main/java/com/github/lindenb/hadoop/Test.java.
First, and this is my main problem, I needed to create a class 'VcfRow' that would contains the whole data about a variant. As I need to keep the information about all the semantics in the VCF header, each record contains the whole VCF header (!). I asked SO if there was an elegant way to save the header in the hadoop workflow but it currently seems that there is no such solution (http://stackoverflow.com/questions/30052859/hadoop-mapreduce-handling-a-text-file-with-a-header). This class
VcfRow must implement WritableComparable to be serialized by the hadoop pipeline. It's awfully sloooooow since we need to parse a htsjdk.variant.vcf.VCFHeader and a htsjdk.variant.vcf.VCFCodec for each new variant.
public static class VcfRow implements WritableComparable<VcfRow> { private List<String> headerLines; private String line; private VariantContext ctx=null; private VCFHeader header =null; private VCFCodec codec=new VCFCodec(); public VcfRow() { this.headerLines = Collections.emptyList(); this.line=""; } public VcfRow(List<String> headerLines,String line) { this.headerLines=headerLines; this.line=line; } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.headerLines.size()); for(int i=0;i< this.headerLines.size();++i) { out.writeUTF(this.headerLines.get(i)); } byte array[]=line.getBytes(); out.writeInt(array.length); out.write(array); } @Override public void readFields(DataInput in) throws IOException { int n= in.readInt(); this.headerLines=new ArrayList<String>(n); for(int i=0;i<n;++i) this.headerLines.add(in.readUTF()); n = in.readInt(); byte array[]=new byte[n]; in.readFully(array); this.line=new String(array); this.codec=new VCFCodec(); this.ctx=null; this.header=null; } public VCFHeader getHeader() { if(this.header==null) { this.header = (VCFHeader)this.codec.readActualHeader(new MyLineIterator()); } return this.header; } public VariantContext getVariantContext() { if(this.ctx==null) { if(this.header==null) getHeader();//force decode header this.ctx=this.codec.decode(this.line); } return this.ctx; } @Override public int compareTo(VcfRow o) { int i = this.getVariantContext().getContig().compareTo(o.getVariantContext().getContig()); if(i!=0) return i; i = this.getVariantContext().getStart() - o.getVariantContext().getStart(); if(i!=0) return i; i = this.getVariantContext().getReference().compareTo( o.getVariantContext().getReference()); if(i!=0) return i; return this.line.compareTo(o.line); } private class MyLineIterator extends AbstractIterator<String> implements LineIterator { int index=0; @Override protected String advance() { if(index>= headerLines.size()) return null; return headerLines.get(index++); } } }
Then a special InputFormat is created for the VCF format. As we need to keep a trace of the Header, this file declares
`isSplitable==false`
. The class VcfInputFormat creates an instance of RecordReader reading the whole VCF header the first time it is invoked with the method `initialize`
. This 'VcfRecordReader' creates a new VcfRow for each line.public static class VcfInputFormat extends FileInputFormat<LongWritable, VcfRow> { private List<String> headerLines=new ArrayList<String>(); @Override public RecordReader<LongWritable, VcfRow> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new VcfRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } //LineRecordReader private class VcfRecordReader extends RecordReader<LongWritable, VcfRow> { private LineRecordReader delegate=new LineRecordReader(); public VcfRecordReader() throws IOException { } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { delegate.initialize(genericSplit, context); while( delegate.nextKeyValue()) { String row = delegate.getCurrentValue().toString(); if(!row.startsWith("#")) throw new IOException("Bad VCF header"); headerLines.add(row); if(row.startsWith("#CHROM")) break; } } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return delegate.getCurrentKey(); } @Override public VcfRow getCurrentValue() throws IOException, InterruptedException { Text row = this.delegate.getCurrentValue(); return new VcfRow(headerLines,row.toString()); } @Override public float getProgress() throws IOException, InterruptedException { return this.delegate.getProgress(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return this.delegate.nextKeyValue(); } @Override public void close() throws IOException { delegate.close(); } } }
The hadoop mapper uses the information of each VCFrow and produce a count of each category:
public static class VariantMapper extends Mapper<LongWritable, VcfRow, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, VcfRow vcfRow, Context context ) throws IOException, InterruptedException { VariantContext ctx = vcfRow.getVariantContext(); if( ctx.isIndel()) { word.set("ctx_indel"); context.write(word, one); } if( ctx.isBiallelic()) { word.set("ctx_biallelic"); context.write(word, one); } if( ctx.isSNP()) { word.set("ctx_snp"); context.write(word, one); } if( ctx.hasID()) { word.set("ctx_id"); context.write(word, one); } word.set("ctx_total"); context.write(word, one); for(String sample: vcfRow.getHeader().getSampleNamesInOrder()) { Genotype g =vcfRow.getVariantContext().getGenotype(sample); word.set(sample+" "+ctx.getType()+" "+g.getType().name()); context.write(word, one); } } }
The Reducer computes the sum of each category:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
and here is the main program:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "snp count"); job.setJarByClass(Test.class); job.setMapperClass(VariantMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath=new Path(args[0]); job.setInputFormatClass(VcfInputFormat.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
Download, compile, Run:
lindenb@hardyweinberg:~/src/hadoop-sandbox$ make -Bn rm -rf hadoop-2.7.0 curl -L -o hadoop-2.7.0.tar.gz "http://apache.spinellicreations.com/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz" tar xvfz hadoop-2.7.0.tar.gz rm hadoop-2.7.0.tar.gz touch -c hadoop-2.7.0/bin/hadoop rm -rf htsjdk-1.130 curl -L -o 1.130.tar.gz "https://github.com/samtools/htsjdk/archive/1.130.tar.gz" tar xvfz 1.130.tar.gz rm 1.130.tar.gz (cd htsjdk-1.130 && ant ) mkdir -p tmp dist javac -d tmp -cp hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0.jar:hadoop-2.7.0/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/hadoop-annotations-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar -sourcepath src/main/java src/main/java/com/github/lindenb/hadoop/Test.java jar cvf dist/test01.jar -C tmp . rm -rf tmp mkdir -p input curl -o input/CEU.exon.2010_09.genotypes.vcf.gz "ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/pilot_data/paper_data_sets/a_map_of_human_variation/exon/snps/CEU.exon.2010_09.genotypes.vcf.gz" gunzip -f input/CEU.exon.2010_09.genotypes.vcf.gz rm -rf output HADOOP_CLASSPATH=htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar hadoop-2.7.0/bin/hadoop jar dist/test01.jar com.github.lindenb.hadoop.Test \ input/CEU.exon.2010_09.genotypes.vcf output cat output/*
Here is the output of the last command:
15/05/05 17:18:34 INFO input.FileInputFormat: Total input paths to process : 1 15/05/05 17:18:34 INFO mapreduce.JobSubmitter: number of splits:1 15/05/05 17:18:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1186897577_0001 15/05/05 17:18:34 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 15/05/05 17:18:34 INFO mapreduce.Job: Running job: job_local1186897577_0001 15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter set in config null 15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 15/05/05 17:18:34 INFO mapred.LocalJobRunner: Waiting for map tasks 15/05/05 17:18:34 INFO mapred.LocalJobRunner: Starting task: attempt_local1186897577_0001_m_000000_0 15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 15/05/05 17:18:34 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 15/05/05 17:18:34 INFO mapred.MapTask: Processing split: file:/home/lindenb/src/hadoop-sandbox/input/CEU.exon.2010_09.genotypes.vcf:0+2530564 15/05/05 17:18:34 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 15/05/05 17:18:34 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 15/05/05 17:18:34 INFO mapred.MapTask: soft limit at 83886080 15/05/05 17:18:34 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 15/05/05 17:18:34 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 15/05/05 17:18:34 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 15/05/05 17:18:35 INFO mapreduce.Job: Job job_local1186897577_0001 running in uber mode : false 15/05/05 17:18:35 INFO mapreduce.Job: map 0% reduce 0% 15/05/05 17:18:36 INFO mapred.LocalJobRunner: 15/05/05 17:18:36 INFO mapred.MapTask: Starting flush of map output 15/05/05 17:18:36 INFO mapred.MapTask: Spilling map output 15/05/05 17:18:36 INFO mapred.MapTask: bufstart = 0; bufend = 7563699; bufvoid = 104857600 15/05/05 17:18:36 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 24902536(99610144); length = 1311861/6553600 15/05/05 17:18:38 INFO mapred.MapTask: Finished spill 0 15/05/05 17:18:38 INFO mapred.Task: Task:attempt_local1186897577_0001_m_000000_0 is done. And is in the process of committing (...) NA12843 SNP HOM_REF 2515 NA12843 SNP HOM_VAR 242 NA12843 SNP NO_CALL 293 NA12872 SNP HET 394 NA12872 SNP HOM_REF 2282 NA12872 SNP HOM_VAR 188 NA12872 SNP NO_CALL 625 NA12873 SNP HET 336 NA12873 SNP HOM_REF 2253 NA12873 SNP HOM_VAR 184 NA12873 SNP NO_CALL 716 NA12874 SNP HET 357 NA12874 SNP HOM_REF 2395 NA12874 SNP HOM_VAR 229 NA12874 SNP NO_CALL 508 NA12878 SNP HET 557 NA12878 SNP HOM_REF 2631 NA12878 SNP HOM_VAR 285 NA12878 SNP NO_CALL 16 NA12889 SNP HET 287 NA12889 SNP HOM_REF 2110 NA12889 SNP HOM_VAR 112 NA12889 SNP NO_CALL 980 NA12890 SNP HET 596 NA12890 SNP HOM_REF 2587 NA12890 SNP HOM_VAR 251 NA12890 SNP NO_CALL 55 NA12891 SNP HET 609 NA12891 SNP HOM_REF 2591 NA12891 SNP HOM_VAR 251 NA12891 SNP NO_CALL 38 NA12892 SNP HET 585 NA12892 SNP HOM_REF 2609 NA12892 SNP HOM_VAR 236 NA12892 SNP NO_CALL 59 ctx_biallelic 3489 ctx_id 3489 ctx_snp 3489 ctx_total 3489
that's it,
Pierre
1 comment:
To use a count of each type of variant/genotypes is a very cool way. But how use it correctly is a deep genotyping analysis that needs us to do first.
Post a Comment