15 June 2011

Sorting and Joining some Genotypes with Hadoop.



This post describes how I've used hadoop to merge two large files produced by our Affymetrix-Axiom Genotyping platform.
Image via OpenWet Ware
(Note: working with the last version of hadoop has been a headache: it contains some classes with the same name: ('org.apache.hadoop.mapreduce.Reducer' and 'org.apache.hadoop.mapred.Reducer' ), many classes have been deprecated ( org.apache.hadoop.mapred.JobConf ) but they are still needed to run some specific algorithms ( `jobConf.setInputFormat(CompositeInputFormat.class)`), etc...

The Input Files


The annotation file

This file describes the genotyped markers (chrom,position,alleleA,allleB, etc...).

(... header ...)
"Probe Set ID","dbSNP RS ID","Chromosome","Physical Position","Strand","ChrX pseudo-autosomal region 1","Cytoband","Flank","Allele A","Allele B","Associated Gene","Genetic Map","Microsatellite","Allele Frequencies","Heterozygous Allele Frequencies","Number of individuals/Number of chromosomes","In Hapmap","Strand Versus dbSNP","Probe Count","ChrX pseudo-autosomal region 2","Minor Allele","Minor Allele Frequency","OMIM"
"AX-11086612","rs10001348","4","29912308","+","0","p15.1","gtattcagttgaacacaaatcagtgcatgt[A/G]","A","G","ENST00000467087 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// ENST00000361762 // upstream // 809728 // Hs.479439 // PCDH7 // 5099 // protocadherin 7 /// NM_001169117 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// NM_032456 // upstream // 809728 // Hs.724529 // PCDH7 // 5099 // protocadherin 7","50.0229786923511 // D4S418 // D4S2408 // --- // --- /// 44.2128449948474 // D4S2397 // D4S2430 // ATA27C07 // GCT6F03 /// 42.4637111703432 // --- // --- // 226002 // 46437","D4S333 // upstream // 47955 /// D4S605 // downstream // 97312","0.872881356 // 0.127118644 // Caucasian /// 0.833333333 // 0.166666667 // Han Chinese /// 0.777777778 // 0.222222222 // Japanese /// 0.775 // 0.225 // Yoruban","0.254237288 // Caucasian /// 0.288888889 // Han Chinese /// 0.355555556 // Japanese /// 0.35 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","G // Caucasian /// G // Han Chinese /// G // Japanese /// G // Yoruban","0.127118644 // Caucasian /// 0.166666667 // Han Chinese /// 0.222222222 // Japanese /// 0.225 // Yoruban","---"
"AX-11086611","rs10001340","4","130341127","+","0","q28.2","[A/C]agggcattcatctcagcttactatttgggaaaaat","A","C","ENST00000281146 // downstream // 306645 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// ENST00000394248 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10 /// NM_173487 // downstream // 307285 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// NM_020815 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10","127.864057946266 // D4S1615 // D4S2365 // --- // --- /// 129.756132396152 // D4S2938 // D4S2365 // AFMA284WG5 // GATA10A12 /// 124.03426335901 // D4S2394 // --- // --- // 55218","D4S3198 // upstream // 331274 /// D4S2394 // downstream // 43310","0.815789474 // 0.184210526 // Caucasian /// 1.0 // 0.0 // Han Chinese /// 1.0 // 0.0 // Japanese /// 0.816666667 // 0.183333333 // Yoruban","0.368421053 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.266666667 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","C // Caucasian /// C // Han Chinese /// C // Japanese /// C // Yoruban","0.184210526 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.183333333 // Yoruban","---"
"AX-11086610","rs10001337","4","54351529","+","0","q12","atgaggagtagccacatgatctaagcacct[C/T]","T","C","ENST00000306888 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// ENST00000263925 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_001126328 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_032622 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1","67.4182086016315 // D4S2971 // D4S1594 // --- // --- /// 62.2091955728879 // D4S2971 // UNKNOWN // AFMB312YG1 // GATA61B02 /// 61.6059658777947 // --- // GATA61B02 // 149925 // ---","D4S461 // upstream // 151923 /// D4S2583E // downstream // 24481","0.118644068 // 0.881355932 // Caucasian /// 0.111111111 // 0.888888889 // Han Chinese /// 0.122222222 // 0.877777778 // Japanese /// 0.025 // 0.975 // Yoruban","0.203389831 // Caucasian /// 0.133333333 // Han Chinese /// 0.244444444 // Japanese /// 0.05 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","T // Caucasian /// T // Han Chinese /// T // Japanese /// T // Yoruban","0.118644068 // Caucasian /// 0.111111111 // Han Chinese /// 0.122222222 // Japanese /// 0.025 // Yoruban","---"
(...)

Calls

This file is returned by the Axiom machine. Each number encodes a genotype (AA/AB/BB/nil).
(..header...)
probeset_id A05.CEL A06.CEL A03.CEL A04.CEL A01.CEL A10.CEL A02.CEL A11.CEL A12.CEL A09.CEL A07.CEL A08.CEL B01.CEL B02.CEL B06.CEL B03.CEL B04.CEL B05.CEL B07.CEL B08.CEL B09.CEL B10.CEL C02.CEL B11.CEL B12.CEL C01.CEL C03.CEL C04.CEL C05.CEL C06.CEL C07.CEL C08.CEL C09.CEL C10.CEL C11.CEL C12.CEL D01.CEL D02.CEL D03.CEL D04.CEL D05.CEL D06.CEL D07.CEL D08.CEL D09.CEL D10.CEL D11.CEL D12.CEL E01.CEL E02.CEL E03.CEL E04.CEL E05.CEL E06.CEL E07.CEL E08.CEL E09.CEL E10.CEL E11.CEL E12.CEL F01.CEL F02.CEL F03.CEL F04.CEL F05.CEL F06.CEL F07.CEL F08.CEL H08.CEL H07.CEL G07.CEL G08.CEL H03.CEL H04.CEL H06.CEL H05.CEL G09.CEL G10.CEL H01.CEL H02.CEL G11.CEL G12.CEL G03.CEL G04.CEL H10.CEL H09.CEL F10.CEL F09.CEL G01.CEL G02.CEL F11.CEL F12.CEL G05.CEL G06.CEL H11.CEL H12.CEL
AX-11086525 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 0 1 0 1 0 0 1 1 2 1 1 0 0 0 0 1 0 0 1 0 0 1 1 0 1 0 1 1 2 0 1 0 0 1 2 0 2
AX-11086526 2 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 2 2 2 2 2 2 2 2 2 2 2 1 2 2 0 2 2 2 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 1 2 1 2 2 0 2 1 2 2 1 2 2 2 2 1 2 2 2 2 2 1 2 1 1 2 2 1 2 2 2 2 2 2 2 2 2 1 2 2 2
AX-11086527 2 0 2 2 2 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 2 2 2 2 2 2 2 2 1 2 2 1 1 0 2 1 2 1 2 2 1 1 1 1 2 2 1 0 2 1 2 1 2 2 2 1 2 1 1 1 1 2 2 2 2 2 1 1 0 2 0 1 0 1 0 1 2 1 1 0 1 1 0 2 1 1 1 0 1 1 1 2 2 1 1
(...)

Joined file

.. and this is what I want to obtain at the end:
AX-11086525 3 165621955 rs10000341 T T T T T T T T T T T T T T T G T T T T T T T T T T T T T T T T T T T T T T T G T G T T T T T G T T T T T T T T T T T T T T T T T T T T T T T T T T T G T T T T T G T T T G T T T T T T T T T T T T T T T T T T T T T G T T T T T T T T T G T T T G T T T G T T T T T G T G G G T G T G T T T T T T T T T G T T T T T G T T T T T G T G T T T G T T T G T G G G T T T G T T T T T G G G T T G G
AX-11086526 3 5237152 rs10000142 C C T C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C T C C C C C C C C C C C C C C C C C C C C C C C T C C C C C T T C C C C C C T C T C T C C C C C C C C C C C C C C C C C C C T C T C T C C C T C C C T C C C C C T T C C T C C C C C T C C C C C C C C C T C C C C C C C C C C C T C C C T C T C C C C C T C C C C C C C C C C C C C C C C C C C T C C C C C C C
(...)

Setting up Hadoop and HDFS


export JAVA_HOME=/your/path/to/JDK
cd hadoop-0.20.203.0

Change the config
in "conf/core-site.xml":
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

in "conf/hdfs-site.xml":
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

in "conf/mapred-site.xml":
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>


Setup ssh for no password:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

change chmod for ssh:
$ chmod 700 ~/.ssh/
$ chmod 640 ~/.ssh/authorized_keys

if needed, add an alias in '/etc/hosts' and restart:"sudo /etc/rc.d/init.d/network restart"
Format HDFS:
bin/hadoop namenode -format
11/06/15 08:32:13 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 0.20.203.0
STARTUP_MSG: build = http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-203 -r 1099333; compiled by 'oom' on Wed May 4 07:57:50 PDT 2011
************************************************************/
11/06/15 08:32:13 INFO util.GSet: VM type = 64-bit
11/06/15 08:32:13 INFO util.GSet: 2% max memory = 19.1675 MB
11/06/15 08:32:13 INFO util.GSet: capacity = 2^21 = 2097152 entries
11/06/15 08:32:13 INFO util.GSet: recommended=2097152, actual=2097152
11/06/15 08:32:13 INFO namenode.FSNamesystem: fsOwner=lindenb
11/06/15 08:32:13 INFO namenode.FSNamesystem: supergroup=supergroup
11/06/15 08:32:13 INFO namenode.FSNamesystem: isPermissionEnabled=true
11/06/15 08:32:13 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
11/06/15 08:32:13 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
11/06/15 08:32:13 INFO namenode.NameNode: Caching file names occuring more than 10 times
11/06/15 08:32:13 INFO common.Storage: Image file of size 113 saved in 0 seconds.
11/06/15 08:32:14 INFO common.Storage: Storage directory /home/lindenb/tmp/HADOOP/dfs/name has been successfully formatted.
11/06/15 08:32:14 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1

... and start HDFS:
$ ./bin/start-all.sh 
namenode running as process 1151. Stop it first.
localhost: starting datanode, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-datanode-srv-clc-04.u915.irt.univ-nantes.prive3.out
localhost: secondarynamenode running as process 1490. Stop it first.
jobtracker running as process 1588. Stop it first.
localhost: starting tasktracker, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-tasktracker-srv-clc-04.u915.irt.univ-nantes.prive3.out

Copy the Axiom data to HDFS:
$bin/hadoop fs  -mkdir myfolder
$ bin/hadoop fs -copyFromLocal ~/Axiom_GW_Hu_SNP.r2.na31.annot.csv myfolder/
$ bin/hadoop fs -copyFromLocal ~/AxiomGT1.calls.txt myfolder/

The Classes

The annotation and the genotypes file will be sorted and we're going to merge both results. The classes implement Writable in order to be (un)serialized from/to HDFS .

Genotypes

This class contains the four possible codes for the genotypes (0, 1, 2 or -1):
 static public class Genotypes implements Writable
{
private byte array[];
public Genotypes()
{
this(0);
}
public Genotypes(int n)
{
this.array=new byte[n];
}
@Override
public void readFields(DataInput in) throws IOException
{
int n=in.readInt();
this.array=new byte[n];
in.readFully(this.array);
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeInt(this.array.length);
out.write(this.array);
}
}

Position

A position on the genome.
static public class Position implements WritableComparable<Position>
{
private String chrom="N/A";
private int position=-1;

public Position()
{
}
public Position(String chrom,int position)
{
this.chrom=chrom;
this.position=position;
}
public String getChrom()
{
return chrom;
}

public int getPosition()
{
return position;
}

@Override
public void readFields(DataInput in) throws IOException {
this.chrom=in.readUTF();
this.position=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(chrom);
out.writeInt(position);
}
@Override
public int compareTo(Position o) {
int i=chrom.compareTo(o.chrom);
if(i!=0) return i;
return position - o.position;
}
@Override
public String toString() {
return chrom+":"+position;
}
}

Marker

This class contains the data available in the annotation file:
static public class Marker implements WritableComparable<Marker>
{
private Position position=new Position();
private String alleleA;
private String alleleB;
private String probeSetId;
private String rsid;
public Marker()
{
}

public Position getPosition() {
return position;
}

public String getAlleleA() {
return alleleA;
}

public String getAlleleB() {
return alleleB;
}

public String getProbeSetId() {
return probeSetId;
}

public String getRsid() {
return rsid;
}

@Override
public void readFields(DataInput in) throws IOException {
this.position.readFields(in);
this.alleleA=in.readUTF();
this.alleleB=in.readUTF();
this.probeSetId=in.readUTF();
this.rsid=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
this.position.write(out);
out.writeUTF(this.alleleA);
out.writeUTF(this.alleleB);
out.writeUTF(this.probeSetId);
out.writeUTF(this.rsid);
}
@Override
public int compareTo(Marker o) {
return position.compareTo(o.position);
}
@Override
public String toString() {
return this.probeSetId+" "+position;
}
}

MapReduce for sorting the Annotation file.

We need to split each CSV line and sort the file on the probeSetId. The output will be a key/value [probeSetId(String)/Marker].
Mapper
public static class SortMarkerByProbIdMapper extends Mapper<LongWritable, Text, Text, Marker>
{
(...)
@Override
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException ,InterruptedException
{
//ignore header and comment
if( value.find("\"Probe Set ID\"")==0 ||
value.find("#")==0 )
{
return;
}

List<String> array=splitCSV(new String(value.getBytes(),0,value.getLength()));
if(array.get(this.chromCol).equals("---")) return;//undefined chromosome

Marker m=new Marker();
m.position=new Position(
array.get(this.chromCol),
Integer.parseInt(array.get(this.posCol))
);
m.alleleA=array.get(this.alleleACol);
m.alleleB=array.get(this.alleleBCol);
m.probeSetId=array.get(this.probeSetIdCol);
m.rsid=array.get(this.rsIdCol);
context.write(new Text(m.probeSetId),m);
}
}

Reducer
public static class SortMarkerByProbIdReducer extends Reducer<Text, Marker, Text, Marker>
{
@Override
protected void reduce(
Text key,
Iterable<Marker> values,
Context context
) throws java.io.IOException ,InterruptedException
{
Marker marker=null;
for(Marker i:values)
{
if(marker!=null) throw new IOException("Duplicate marker id "+key);
marker=i;
}
if(marker==null) return;
context.write(key,marker);
}
}

Job
private void sortMarkersByProbeid(Configuration conf) throws Exception
{
//JobConf jobConf=new JobConf(conf);

FileSystem fs=FileSystem.get(conf);
final Path outPath=new Path(SORTED_MARKERS_PATH);
final Path inPath=new Path("myfolder/Axiom_GW_Hu_SNP.r2.na31.annot.csv");

List<String> header=null;
(...)
/*
(...)
open the file and find the column indexes
(...)
*/

Job job = new Job(conf, Hadoop01.class.getName());
job.setJarByClass(Hadoop01.class);
job.setMapperClass(SortMarkerByProbIdMapper.class);
job.setReducerClass(SortMarkerByProbIdReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Marker.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Marker.class);
job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);


if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);


if(!job.waitForCompletion(true) )
{
throw new IOException("Cannot complete job");
}
}

MapReduce for sorting the 'Genotypes' file.

The output will be a key/value [probeSetId(String)/Genotypes].
Mapper
public static class SortCallByProbIdMapper extends Mapper<LongWritable, Text, Text, Genotypes>
{
final private Pattern tab=Pattern.compile("[\t]");
@Override
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException ,InterruptedException
{
//ignore header and comment
if( value.find("probeset_id")==0 ||
value.find("%")==0 )
{
return;
}

String tokens[]=tab.split(new String(value.getBytes(),0,value.getLength()));
Genotypes genotypes=new Genotypes(tokens.length-1);
for(int i=1;i< tokens.length;++i)
{
genotypes.array[i-1]=Byte.parseByte(tokens[i]);
}
context.write(new Text(tokens[0]),genotypes);
}
}

Reducer
public static class SortCallByProbIdReducer extends Reducer<Text, Genotypes, Text, Genotypes>
{
@Override
protected void reduce(
Text key,
Iterable<Genotypes> values,
Context context
) throws java.io.IOException ,InterruptedException
{
Genotypes array=null;
for(Genotypes i:values)
{
if(array!=null) throw new IOException("Duplicate marker id "+key);
array=i;
}
if(array==null) return;
context.write(key,array);
}
}

Job
public void sortCallsByProbeid(Configuration conf) throws Exception
{
//JobConf jobConf=new JobConf(conf);

FileSystem fs=FileSystem.get(conf);
final Path outPath=new Path(SORTED_CALLS_PATH);
final Path inPath=new Path("myfolder/AxiomGT1.calls.txt");

Job job = new Job(conf, Hadoop01.class.getName());
job.setJarByClass(Hadoop01.class);
job.setMapperClass(SortCallByProbIdMapper.class);
job.setReducerClass(SortCallByProbIdReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Genotypes.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Genotypes.class);

job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);



if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);


if(!job.waitForCompletion(true) )
{
throw new IOException("Cannot complete job");
}
}

Joining the two sorted files

It was the most complicated part, as I didn't found much documentation about how to 'join' two files in HDFS with the latest hadoop API. The following solution seems to work and it only needs a reducer:
Reducer
public static class SortCallByProbIdReducer extends Reducer<Text, Genotypes, Text, Genotypes>
{
@Override
protected void reduce(
Text key,
Iterable<Genotypes> values,
Context context
) throws java.io.IOException ,InterruptedException
{
Genotypes array=null;
for(Genotypes i:values)
{
if(array!=null) throw new IOException("Duplicate marker id "+key);
array=i;
}
if(array==null) return;
context.write(key,array);
}
}

Job
private void join(Configuration conf) throws Exception
{
FileSystem fs=FileSystem.get(conf);
Path outPath=new Path(JOIN_PATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

final String compose=CompositeInputFormat.compose(
"inner",
SequenceFileInputFormat.class,
new Path(SORTED_MARKERS_PATH),
new Path(SORTED_CALLS_PATH)
);
System.err.println(compose);
JobConf jobConf = new JobConf(conf, getClass());
jobConf.setJobName("join");
jobConf.setInputFormat(CompositeInputFormat.class);
jobConf.set("mapred.join.expr",
compose);

jobConf.setMapOutputKeyClass(Text.class);
jobConf.setMapOutputValueClass(TupleWritable.class);
jobConf.setOutputValueClass(Text.class);//TupleWritable ?
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setReducerClass(JoinReducer.class);

//jobConf.setMapOutputValueClass(Text.class);
org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, outPath);
JobClient.runJob(jobConf);
}

Full source code



That's it,

Pierre

3 comments:

  1. Nice work.

    The mapred package is the "old api" and the mapreduce is the refactored API. The mapreduce package is not complete and it's recommended you use the mapred package. mapred probably should not have been deprecated.

    ReplyDelete
  2. Very nice explanation. Have you looked into cascading api for hadoop? That will simplify your code.

    ReplyDelete
  3. As we also follow this blog along with attending hadoop online training center, our knowledge about the hadoop increased in manifold ways. Thanks for the way information is presented on this blog.

    ReplyDelete