Counting the reads in a BAM file using SGE and the Open-MPI library: my notebook.
In the current post, I'll describe my first Open MPI program. Open MPI is "a Message Passing Interface (MPI) library, a standardized and portable message-passing system to function on a wide variety of parallel computers". My C program takes a list of BAMs, distributes some jobs on the SGE (SUN/Oracle Grid Engine) to count the number of reads and returns the results to a master process.
Downloading and installing
The library was downloaded from http://www.open-mpi.org/software/ompi/v1.6/, compiled with the option --with-sge and installed in'/commun/data/packages/openmpi'
../configure --prefix=/commun/data/packages/openmpi --with-sge make make install
Configuring SGE to run MPI-based program
As described in http://docs.oracle.com/cd/E19080-01/n1.grid.eng6/817-5677/6ml49n2c0/index.html .$ su $ source /commun/sge_root/bird/common/settings.csh $ setenv ARCH lx24-amd64 $ qconf -ap orteAnd added:
pe_name orte slots 448 user_lists NONE xuser_lists NONE start_proc_args /bin/true stop_proc_args /bin/true allocation_rule $round_robin control_slaves TRUE job_is_first_task TRUE urgency_slots min accounting_summary FALSEAdd orte to the parallele env list:
qconf -mattr queue pe_list "orte" all.qUsing qconf -mconf I've also set shell_start_mode to 'unix_behavior'.
Algorithm
In the 'main' method, test if this is the master(==0) or a slave process(!=0):MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (rank == 0) { master(argc-1,&argv[1]); } else { slave(argc-1,&argv[1]); }
MASTER
The master gets the number of available proc:MPI_Comm_size(MPI_COMM_WORLD, &proc_count)Loop over the number of available processors and the BAMS, for each BAM, create a fixed size structure containing the path to the BAM as well as the count of reads initialized to '0':
typedef struct Params_t { char filename[FILENAME_MAX]; int is_error; long count_total; long count_mapped; long count_properly_paired; long count_dup; }Params;This structure is sent to the slaves:
MPI_Send( ¶m, /* message buffer */ sizeof(Params), /* one data item */ MPI_CHAR, /* data item is an array of bytes */ rank, /* destination process rank */ TAG_COUNT_BAM, /* user chosen message tag */ MPI_COMM_WORLD /* default communicator */ );and we wait for the slaves to return those 'Params' with the filled values.:
MPI_Recv(¶m, /* message buffer */ sizeof(Params), /* one data item */ MPI_CHAR, /* data item is an array of bytes */ MPI_ANY_SOURCE, /* receive from any sender */ MPI_ANY_TAG, /* any type of message */ MPI_COMM_WORLD, /* default communicator */ &status);And the result is printed:
printf("%s\t%ld\t%ld\t%ld\t%ld\n", param.filename, param.count_total, param.count_mapped, param.count_properly_paired, param.count_dup );At the end of the master method, when no more BAM is available, the slaves are released (tag is 'TAG_RELEASE_SLAVE') with :
MPI_Send( &empty, /* message buffer */ sizeof(Params), /* one data item */ MPI_CHAR, /* data item is an array of bytes */ rank, /* destination process rank */ TAG_RELEASE_SLAVE, /* SHutdown */ MPI_COMM_WORLD ); /* default communicator */
SLAVE
The slave method receives some messages from the master:MPI_Recv( ¶m, sizeof(Params), MPI_CHAR, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);If the message was 'TAG_RELEASE_SLAVE' we exit the method. Else the BAM file is scanned using the SAmtools API for C.
count_reads(¶m);and the result is sent back to the master:
MPI_Send( ¶m, sizeof(Params), MPI_CHAR, 0, 0, MPI_COMM_WORLD );.
Running the Job on SGE
I still don't understand why I need to write the following line:# qsh -pe orte 4 Your job 84581 ("INTERACTIVE") has been submitted waiting for interactive job to be scheduled ... Your interactive job 84581 has been successfully scheduled.Run the analysis:
qrsh -cwd -pe orte 100 /commun/data/packages/openmpi/bin/mpirun \ /path/to/ompibam \ `find /commun/data/projects/folder1234/align -name "*.bam"`qstat -f shows the jobs running:
arch states --------------------------------------------------------------------------------- all.q@node01 BIP 0/15/64 2.76 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 15 --------------------------------------------------------------------------------- all.q@node02 BIP 0/14/64 3.89 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 14 --------------------------------------------------------------------------------- all.q@node03 BIP 0/14/64 3.23 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 14 --------------------------------------------------------------------------------- all.q@node04 BIP 0/14/64 3.68 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 14 --------------------------------------------------------------------------------- all.q@node05 BIP 0/15/64 2.91 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 15 --------------------------------------------------------------------------------- all.q@node06 BIP 0/14/64 3.91 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 14 --------------------------------------------------------------------------------- all.q@node07 BIP 0/14/64 3.79 lx24-amd64 84598 0.55500 mpirun lindenb r 02/11/2013 12:03:36 14output:
#filename total mapped properly_paired dup /commun/data/projects/folder1234/11X0589_markdup.bam 1179766 973943 684794 0 /commun/data/projects/folder1234/11X0589_pair1_sorted.bam 1179766 973943 684794 0 /commun/data/projects/folder1234/11X0589_realigned.bam 1179766 973943 684794 0 /commun/data/projects/folder1234/11X0589_recal.bam 1179766 973943 684794 0 /commun/data/projects/folder1234/11X0602_pair1_sorted.bam 982006 815636 566136 0 /commun/data/projects/folder1234/11X0602_realigned.bam 982006 815636 566136 0 /commun/data/projects/folder1234/11X0602_markdup.bam 982006 815636 566136 0 /commun/data/projects/folder1234/11X0602_pair1_unsorted.bam 982006 815636 566136 0 /commun/data/projects/folder1234/11X0375_pair1_sorted.bam 1255772 1031155 735480 0 /commun/data/projects/folder1234/11X0375_realigned.bam 1255772 1031155 735480 0 /commun/data/projects/folder1234/11X0375_markdup.bam 1255772 1031155 735480 0 /commun/data/projects/folder1234/11X0367_pair1_sorted.bam 1422144 1158315 796466 0 /commun/data/projects/folder1234/11X0367_markdup.bam 1422144 1158315 796466 0 /commun/data/projects/folder1234/11X0375_pair1_unsorted.bam 1255772 1031155 735480 0 /commun/data/projects/folder1234/11X0367_realigned.bam 1422144 1158315 796466 0 /commun/data/projects/folder1234/11X0291_pair1_sorted.bam 1296294 1067233 743924 0 /commun/data/projects/folder1234/11X0291_markdup.bam 1296294 1067233 743924 0 /commun/data/projects/folder1234/11X0291_realigned.bam 1296294 1067233 743924 0 /commun/data/projects/folder1234/11X0311_markdup.bam 1353534 1117463 788474 0 /commun/data/projects/folder1234/11X0311_realigned.bam 1353534 1117463 788474 0 /commun/data/projects/folder1234/11X0375_recal.bam 1255772 1031155 735480 0 /commun/data/projects/folder1234/11X0589_pair1_unsorted.bam 1179766 973943 684794 0 /commun/data/projects/folder1234/11X0367_pair1_unsorted.bam 1422144 1158315 796466 0 /commun/data/projects/folder1234/11X0290_pair1_sorted.bam 1436970 1191044 837726 0 /commun/data/projects/folder1234/11X0290_realigned.bam 1436970 1191044 837726 0 /commun/data/projects/folder1234/11X0291_pair1_unsorted.bam 1296294 1067233 743924 0 /commun/data/projects/folder1234/11X0311_pair1_sorted.bam 1353534 1117463 788474 0 /commun/data/projects/folder1234/11X0240_pair1_sorted.bam 1489672 1229654 830042 0 /commun/data/projects/folder1234/11X0240_realigned.bam 1489672 1229654 830042 0 /commun/data/projects/folder1234/11X0240_markdup.bam 1489672 1229654 830042 0 /commun/data/projects/folder1234/XX10272_pair1_sorted.bam 1579608 1296449 876040 0 /commun/data/projects/folder1234/XX10272_markdup.bam 1579608 1296449 876040 0 /commun/data/projects/folder1234/XX10272_realigned.bam 1579608 1296449 876040 0 /commun/data/projects/folder1234/11X0602_recal.bam 982006 815636 566136 0 /commun/data/projects/folder1234/11X0290_markdup.bam 1436970 1191044 837726 0 /commun/data/projects/folder1234/11X0290_pair1_unsorted.bam 1436970 1191044 837726 0 /commun/data/projects/folder1234/11X0247_pair1_sorted.bam 1489832 1233208 862842 0 /commun/data/projects/folder1234/11X0456_pair1_sorted.bam 1486490 1220969 848738 0 /commun/data/projects/folder1234/11X0456_realigned.bam 1486490 1220969 848738 0 /commun/data/projects/folder1234/11X0247_markdup.bam 1489832 1233208 862842 0 /commun/data/projects/folder1234/11X0247_realigned.bam 1489832 1233208 862842 0 /commun/data/projects/folder1234/11X0456_markdup.bam 1486490 1220969 848738 0 /commun/data/projects/folder1234/11X0367_recal.bam 1422144 1158315 796466 0 /commun/data/projects/folder1234/11X0305_pair1_sorted.bam 1818236 1489267 1027458 0 /commun/data/projects/folder1234/11X0305_markdup.bam 1818236 1489267 1027458 0 /commun/data/projects/folder1234/11X0305_realigned.bam 1818236 1489267 1027458 0 /commun/data/projects/folder1234/11X0542_pair1_sorted.bam 1784860 1469183 1016314 0 /commun/data/projects/folder1234/XX07551_recal.bam 1778358 1456518 1018694 0 /commun/data/projects/folder1234/11X0542_realigned.bam 1784860 1469183 1016314 0 /commun/data/projects/folder1234/11X0542_markdup.bam 1784860 1469183 1016314 0 /commun/data/projects/folder1234/11X0311_pair1_unsorted.bam 1353534 1117463 788474 0 /commun/data/projects/folder1234/11X0291_recal.bam 1296294 1067233 743924 0 /commun/data/projects/folder1234/XX07551_pair1_sorted.bam 1778358 1456518 1018694 0 /commun/data/projects/folder1234/11X0240_pair1_unsorted.bam 1489672 1229654 830042 0 /commun/data/projects/folder1234/XX10272_pair1_unsorted.bam 1579608 1296449 876040 0 /commun/data/projects/folder1234/XX07551_markdup.bam 1778358 1456518 1018694 0 /commun/data/projects/folder1234/XX07551_realigned.bam 1778358 1456518 1018694 0 /commun/data/projects/folder1234/11X0305_pair1_unsorted.bam 1818236 1489267 1027458 0 /commun/data/projects/folder1234/11X0311_recal.bam 1353534 1117463 788474 0 /commun/data/projects/folder1234/11X0456_pair1_unsorted.bam 1486490 1220969 848738 0 /commun/data/projects/folder1234/11X0247_pair1_unsorted.bam 1489832 1233208 862842 0 /commun/data/projects/folder1234/11X0542_pair1_unsorted.bam 1784860 1469183 1016314 0 /commun/data/projects/folder1234/XX07551_pair1_unsorted.bam 1778358 1456518 1018694 0 /commun/data/projects/folder1234/11X0433_pair1_sorted.bam 2178798 1800943 1249738 0 /commun/data/projects/folder1234/11X0433_markdup.bam 2178798 1800943 1249738 0 /commun/data/projects/folder1234/XX10272_recal.bam 1579608 1296449 876040 0 /commun/data/projects/folder1234/11X0433_realigned.bam 2178798 1800943 1249738 0 /commun/data/projects/folder1234/11X0240_recal.bam 1489672 1229654 830042 0 /commun/data/projects/folder1234/11X0290_recal.bam 1436970 1191044 837726 0 /commun/data/projects/folder1234/11X0456_recal.bam 1486490 1220969 848738 0 /commun/data/projects/folder1234/11X0247_recal.bam 1489832 1233208 862842 0 /commun/data/projects/folder1234/11X0433_pair1_unsorted.bam 2178798 1800943 1249738 0 /commun/data/projects/folder1234/11X0305_recal.bam 1818236 1489267 1027458 0 /commun/data/projects/folder1234/11X0542_recal.bam 1784860 1469183 1016314 0 /commun/data/projects/folder1234/11X0433_recal.bam 2178798 1800943 1249738 0However I noticed seems that running this code on the cluster is slower that running it on the master node...
The C code
Note: the C code is compiled with ortecc, not gcc.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SAMDIR=/commun/data/packages/samtools-0.1.18 | |
OPENMPI.dir=/commun/data/packages/openmpi | |
CC=$(OPENMPI.dir)/bin/ortecc | |
.PHONY: all | |
all: ./ompibam | |
./ompibam: ompibam.c | |
$(CC) -o $@ -O3 -I$(SAMDIR) \ | |
-Wl,-rpath -Wl,$(OPENMPI.dir)/lib \ | |
-L$(SAMDIR) -Wall $< -lmpi -lbam -lz |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Author : Pierre Lindenbaum PhD | |
* Date : 2013 | |
* WWW: http://plindenbaum.blogspot.com | |
* Motivation: | |
* my first Open-MPI source. Takes a list of BAM files | |
* and count the number of reads. | |
*/ | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <mpi.h> | |
#include "sam.h" | |
/** message send to the node: "count the read in a bam" */ | |
#define TAG_COUNT_BAM 1 | |
/** message send to the node: "shutdown" */ | |
#define TAG_RELEASE_SLAVE 2 | |
/** | |
* fixed-size structure send to the nodes. contains the BAMs and the counts of reads | |
*/ | |
typedef struct Params_t | |
{ | |
char filename[FILENAME_MAX]; | |
int is_error; | |
long count_total; | |
long count_mapped; | |
long count_properly_paired; | |
long count_dup; | |
}Params; | |
#define IS_FLAG_SET(b,f) ((b->core.flag & (f))!=0) | |
/** | |
* count the reads in a BAM file using the SAMTOOLS API | |
* fills the structure Params* | |
*/ | |
static void count_reads(Params* p) | |
{ | |
int ret; | |
bam1_t* b; | |
bamFile fp=bam_open(p->filename, "r") ; | |
bam_header_t *header; | |
if(fp==0) | |
{ | |
fprintf(stderr,"Cannot open %s\n",p->filename); | |
p->is_error=1; | |
return; | |
} | |
header= bam_header_read(fp); | |
b=bam_init1(); | |
while ((ret = bam_read1(fp, b)) >= 0) | |
{ | |
p->count_total++; | |
if(IS_FLAG_SET(b,BAM_FUNMAP)) continue; | |
p->count_mapped++; | |
if(IS_FLAG_SET(b,BAM_FPROPER_PAIR)) | |
{ | |
p->count_properly_paired++; | |
} | |
if(IS_FLAG_SET(b,BAM_FDUP)) | |
{ | |
p->count_dup++; | |
} | |
} | |
bam_destroy1(b); | |
bam_header_destroy(header); | |
bam_close(fp); | |
} | |
/** | |
* This is the slave method, it receive a message 'Params' from the master method | |
* if the status is TAG_RELEASE_SLAVE the slave is released | |
* else the bam in param.filename is analyszed and the message is sent back to the master | |
*/ | |
static void slave() | |
{ | |
for(;;) | |
{ | |
MPI_Status status; | |
Params param; | |
/* Receive a message from the master */ | |
MPI_Recv( | |
¶m, | |
sizeof(Params), | |
MPI_CHAR, | |
0, | |
MPI_ANY_TAG, | |
MPI_COMM_WORLD, | |
&status); | |
if(status.MPI_TAG==TAG_RELEASE_SLAVE) | |
{ | |
break; | |
} | |
count_reads(¶m); | |
/* Send the result back */ | |
MPI_Send( | |
¶m, | |
sizeof(Params), | |
MPI_CHAR, | |
0, | |
0, | |
MPI_COMM_WORLD | |
); | |
} | |
} | |
/** | |
* this is the master method, while there is | |
* a bam in the queue, send the analysis to the slaves. | |
* At the end, close the slaves | |
*/ | |
static void master(const int n_bams,char** argv) | |
{ | |
int rank=0; | |
int optind=0; | |
int proc_count=0; | |
/* get number of available processor proc_count */ | |
if(MPI_Comm_size(MPI_COMM_WORLD, &proc_count)!=MPI_SUCCESS ) | |
{ | |
return; | |
} | |
printf("#filename\ttotal\tmapped\tproperly_paired\tdup\n"); | |
/* loop over the bam files */ | |
while(optind< n_bams) | |
{ | |
int i=0; | |
/* number of message sent */ | |
int count_sent=0; | |
/* loop over the available procs */ | |
for (rank = 1; | |
rank <proc_count && optind+count_sent < n_bams; | |
++rank) | |
{ | |
/* create a new param */ | |
Params param; | |
/** fill to '0' */ | |
memset((void*)¶m,0,sizeof(Params)); | |
/* set the BAM filename */ | |
strcpy(param.filename,argv[count_sent+optind]); | |
/* Send it to each rank */ | |
MPI_Send( | |
¶m, /* message buffer */ | |
sizeof(Params), /* one data item */ | |
MPI_CHAR, /* data item is an array of bytes */ | |
rank, /* destination process rank */ | |
TAG_COUNT_BAM, /* user chosen message tag */ | |
MPI_COMM_WORLD /* default communicator */ | |
); | |
++count_sent; | |
} | |
/* now we get the results */ | |
for(i=0;i< count_sent;++i) | |
{ | |
/* get a message from the slave */ | |
Params param; | |
MPI_Status status; | |
MPI_Recv(¶m, /* message buffer */ | |
sizeof(Params), /* one data item */ | |
MPI_CHAR, /* data item is an array of bytes */ | |
MPI_ANY_SOURCE, /* receive from any sender */ | |
MPI_ANY_TAG, /* any type of message */ | |
MPI_COMM_WORLD, /* default communicator */ | |
&status); /* info about the received message */ | |
if(param.is_error!=0) | |
{ | |
fprintf(stderr,"Error for %s\n",param.filename); | |
} | |
else /* print the result */ | |
{ | |
printf("%s\t%ld\t%ld\t%ld\t%ld\n", | |
param.filename, | |
param.count_total, | |
param.count_mapped, | |
param.count_properly_paired, | |
param.count_dup | |
); | |
} | |
} | |
if(count_sent==0) | |
{ | |
break; | |
} | |
optind+=count_sent; | |
} | |
/* shutdown the slave */ | |
for (rank = 1; rank < proc_count; ++rank) | |
{ | |
Params empty; | |
MPI_Send( | |
&empty, /* message buffer */ | |
sizeof(Params), /* one data item */ | |
MPI_CHAR, /* data item is an array of bytes */ | |
rank, /* destination process rank */ | |
TAG_RELEASE_SLAVE, /* SHutdown */ | |
MPI_COMM_WORLD | |
); /* default communicator */ | |
} | |
} | |
int main(int argc, char *argv[]) | |
{ | |
int rank=0; | |
/* Initialisation */ | |
MPI_Init(&argc, &argv); | |
/* Find out my identity in the default communicator */ | |
MPI_Comm_rank(MPI_COMM_WORLD, &rank); | |
/* rand is zero = I'm the master */ | |
if (rank == 0) | |
{ | |
master(argc-1,&argv[1]); | |
} | |
else /* I'm a slave */ | |
{ | |
slave(argc-1,&argv[1]); | |
} | |
/* Shut down MPI */ | |
MPI_Finalize(); | |
return EXIT_SUCCESS; | |
} |
I'm still very new to this API and to those concepts. Feel free to suggest anything :-)
That's it,
Pierre
1 comment:
Hi Pierre,
Thanks a ton for this post, I'm going to try this out on my cluster here. I have a tool which iterates through a bcf/vcf file and computes a statistic at each locus. I'll let you know if I can parallelize that using OpenMP !
Thanks again,
Avinash
Post a Comment