11 February 2013

Counting the reads in a BAM file using SGE and the Open-MPI library: my notebook.

In the current post, I'll describe my first Open MPI program. Open MPI is "a Message Passing Interface (MPI) library, a standardized and portable message-passing system to function on a wide variety of parallel computers". My C program takes a list of BAMs, distributes some jobs on the SGE (SUN/Oracle Grid Engine) to count the number of reads and returns the results to a master process.

Downloading and installing

The library was downloaded from http://www.open-mpi.org/software/ompi/v1.6/, compiled with the option --with-sge and installed in '/commun/data/packages/openmpi'.
./configure --prefix=/commun/data/packages/openmpi --with-sge
make 
make install

Configuring SGE to run MPI-based program

As described in http://docs.oracle.com/cd/E19080-01/n1.grid.eng6/817-5677/6ml49n2c0/index.html .
$ su
$ source /commun/sge_root/bird/common/settings.csh
$ setenv ARCH lx24-amd64
$ qconf -ap orte
And added:
pe_name            orte
slots              448
user_lists         NONE
xuser_lists        NONE
start_proc_args    /bin/true
stop_proc_args     /bin/true
allocation_rule    $round_robin
control_slaves     TRUE
job_is_first_task  TRUE
urgency_slots      min
accounting_summary FALSE
Add orte to the parallele env list:
qconf -mattr queue pe_list "orte" all.q
Using qconf -mconf I've also set shell_start_mode to 'unix_behavior'.

Algorithm

In the 'main' method, test if this is the master(==0) or a slave process(!=0):
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0)
 {
 master(argc-1,&argv[1]);
 }
else
 {
 slave(argc-1,&argv[1]);
 }

MASTER

The master gets the number of available proc:
MPI_Comm_size(MPI_COMM_WORLD, &proc_count)
Loop over the number of available processors and the BAMS, for each BAM, create a fixed size structure containing the path to the BAM as well as the count of reads initialized to '0':
typedef struct Params_t
 {
 char filename[FILENAME_MAX];
 int is_error;
 long count_total;
 long count_mapped;
 long count_properly_paired;
 long count_dup;
 }Params;
This structure is sent to the slaves:
 MPI_Send(
 &param, /* message buffer */
 sizeof(Params), /* one data item */
 MPI_CHAR, /* data item is an array of bytes */
 rank, /* destination process rank */
 TAG_COUNT_BAM, /* user chosen message tag */
 MPI_COMM_WORLD /* default communicator */
 ); 
and we wait for the slaves to return those 'Params' with the filled values.:
MPI_Recv(&param, /* message buffer */
 sizeof(Params), /* one data item */
 MPI_CHAR, /* data item is an array of bytes */
 MPI_ANY_SOURCE, /* receive from any sender */
 MPI_ANY_TAG, /* any type of message */
 MPI_COMM_WORLD, /* default communicator */
 &status);
And the result is printed:
printf("%s\t%ld\t%ld\t%ld\t%ld\n",
 param.filename,
 param.count_total,
 param.count_mapped,
 param.count_properly_paired,
 param.count_dup
 );
At the end of the master method, when no more BAM is available, the slaves are released (tag is 'TAG_RELEASE_SLAVE') with :
MPI_Send(
 &empty, /* message buffer */
 sizeof(Params), /* one data item */
 MPI_CHAR, /* data item is an array of bytes */
 rank, /* destination process rank */
 TAG_RELEASE_SLAVE, /* SHutdown */
 MPI_COMM_WORLD
 ); /* default communicator */

SLAVE

The slave method receives some messages from the master:
MPI_Recv(
 &param,
 sizeof(Params),
 MPI_CHAR,
 0,
 MPI_ANY_TAG,
 MPI_COMM_WORLD,
 &status);
If the message was 'TAG_RELEASE_SLAVE' we exit the method. Else the BAM file is scanned using the SAmtools API for C.
count_reads(&param);
and the result is sent back to the master:
MPI_Send(
 &param,
 sizeof(Params),
 MPI_CHAR,
 0,
 0,
 MPI_COMM_WORLD
 );
.

Running the Job on SGE

I still don't understand why I need to write the following line:
# qsh -pe orte 4
Your job 84581 ("INTERACTIVE") has been submitted
waiting for interactive job to be scheduled ...
Your interactive job 84581 has been successfully scheduled.
Run the analysis:
qrsh -cwd -pe orte 100 /commun/data/packages/openmpi/bin/mpirun \
 /path/to/ompibam \
 `find /commun/data/projects/folder1234/align -name "*.bam"`
qstat -f shows the jobs running:
arch          states
   ---------------------------------------------------------------------------------
   all.q@node01                   BIP   0/15/64        2.76 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    15
   ---------------------------------------------------------------------------------
   all.q@node02                   BIP   0/14/64        3.89 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    14
   ---------------------------------------------------------------------------------
   all.q@node03                   BIP   0/14/64        3.23 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    14
   ---------------------------------------------------------------------------------
   all.q@node04                   BIP   0/14/64        3.68 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    14
   ---------------------------------------------------------------------------------
   all.q@node05                   BIP   0/15/64        2.91 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    15
   ---------------------------------------------------------------------------------
   all.q@node06                   BIP   0/14/64        3.91 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    14
   ---------------------------------------------------------------------------------
   all.q@node07                   BIP   0/14/64        3.79 lx24-amd64
      84598 0.55500 mpirun     lindenb      r     02/11/2013 12:03:36    14 
output:
#filename total mapped properly_paired dup
/commun/data/projects/folder1234/11X0589_markdup.bam 1179766 973943 684794 0
/commun/data/projects/folder1234/11X0589_pair1_sorted.bam 1179766 973943 684794 0
/commun/data/projects/folder1234/11X0589_realigned.bam 1179766 973943 684794 0
/commun/data/projects/folder1234/11X0589_recal.bam 1179766 973943 684794 0
/commun/data/projects/folder1234/11X0602_pair1_sorted.bam 982006 815636 566136 0
/commun/data/projects/folder1234/11X0602_realigned.bam 982006 815636 566136 0
/commun/data/projects/folder1234/11X0602_markdup.bam 982006 815636 566136 0
/commun/data/projects/folder1234/11X0602_pair1_unsorted.bam 982006 815636 566136 0
/commun/data/projects/folder1234/11X0375_pair1_sorted.bam 1255772 1031155 735480 0
/commun/data/projects/folder1234/11X0375_realigned.bam 1255772 1031155 735480 0
/commun/data/projects/folder1234/11X0375_markdup.bam 1255772 1031155 735480 0
/commun/data/projects/folder1234/11X0367_pair1_sorted.bam 1422144 1158315 796466 0
/commun/data/projects/folder1234/11X0367_markdup.bam 1422144 1158315 796466 0
/commun/data/projects/folder1234/11X0375_pair1_unsorted.bam 1255772 1031155 735480 0
/commun/data/projects/folder1234/11X0367_realigned.bam 1422144 1158315 796466 0
/commun/data/projects/folder1234/11X0291_pair1_sorted.bam 1296294 1067233 743924 0
/commun/data/projects/folder1234/11X0291_markdup.bam 1296294 1067233 743924 0
/commun/data/projects/folder1234/11X0291_realigned.bam 1296294 1067233 743924 0
/commun/data/projects/folder1234/11X0311_markdup.bam 1353534 1117463 788474 0
/commun/data/projects/folder1234/11X0311_realigned.bam 1353534 1117463 788474 0
/commun/data/projects/folder1234/11X0375_recal.bam 1255772 1031155 735480 0
/commun/data/projects/folder1234/11X0589_pair1_unsorted.bam 1179766 973943 684794 0
/commun/data/projects/folder1234/11X0367_pair1_unsorted.bam 1422144 1158315 796466 0
/commun/data/projects/folder1234/11X0290_pair1_sorted.bam 1436970 1191044 837726 0
/commun/data/projects/folder1234/11X0290_realigned.bam 1436970 1191044 837726 0
/commun/data/projects/folder1234/11X0291_pair1_unsorted.bam 1296294 1067233 743924 0
/commun/data/projects/folder1234/11X0311_pair1_sorted.bam 1353534 1117463 788474 0
/commun/data/projects/folder1234/11X0240_pair1_sorted.bam 1489672 1229654 830042 0
/commun/data/projects/folder1234/11X0240_realigned.bam 1489672 1229654 830042 0
/commun/data/projects/folder1234/11X0240_markdup.bam 1489672 1229654 830042 0
/commun/data/projects/folder1234/XX10272_pair1_sorted.bam 1579608 1296449 876040 0
/commun/data/projects/folder1234/XX10272_markdup.bam 1579608 1296449 876040 0
/commun/data/projects/folder1234/XX10272_realigned.bam 1579608 1296449 876040 0
/commun/data/projects/folder1234/11X0602_recal.bam 982006 815636 566136 0
/commun/data/projects/folder1234/11X0290_markdup.bam 1436970 1191044 837726 0
/commun/data/projects/folder1234/11X0290_pair1_unsorted.bam 1436970 1191044 837726 0
/commun/data/projects/folder1234/11X0247_pair1_sorted.bam 1489832 1233208 862842 0
/commun/data/projects/folder1234/11X0456_pair1_sorted.bam 1486490 1220969 848738 0
/commun/data/projects/folder1234/11X0456_realigned.bam 1486490 1220969 848738 0
/commun/data/projects/folder1234/11X0247_markdup.bam 1489832 1233208 862842 0
/commun/data/projects/folder1234/11X0247_realigned.bam 1489832 1233208 862842 0
/commun/data/projects/folder1234/11X0456_markdup.bam 1486490 1220969 848738 0
/commun/data/projects/folder1234/11X0367_recal.bam 1422144 1158315 796466 0
/commun/data/projects/folder1234/11X0305_pair1_sorted.bam 1818236 1489267 1027458 0
/commun/data/projects/folder1234/11X0305_markdup.bam 1818236 1489267 1027458 0
/commun/data/projects/folder1234/11X0305_realigned.bam 1818236 1489267 1027458 0
/commun/data/projects/folder1234/11X0542_pair1_sorted.bam 1784860 1469183 1016314 0
/commun/data/projects/folder1234/XX07551_recal.bam 1778358 1456518 1018694 0
/commun/data/projects/folder1234/11X0542_realigned.bam 1784860 1469183 1016314 0
/commun/data/projects/folder1234/11X0542_markdup.bam 1784860 1469183 1016314 0
/commun/data/projects/folder1234/11X0311_pair1_unsorted.bam 1353534 1117463 788474 0
/commun/data/projects/folder1234/11X0291_recal.bam 1296294 1067233 743924 0
/commun/data/projects/folder1234/XX07551_pair1_sorted.bam 1778358 1456518 1018694 0
/commun/data/projects/folder1234/11X0240_pair1_unsorted.bam 1489672 1229654 830042 0
/commun/data/projects/folder1234/XX10272_pair1_unsorted.bam 1579608 1296449 876040 0
/commun/data/projects/folder1234/XX07551_markdup.bam 1778358 1456518 1018694 0
/commun/data/projects/folder1234/XX07551_realigned.bam 1778358 1456518 1018694 0
/commun/data/projects/folder1234/11X0305_pair1_unsorted.bam 1818236 1489267 1027458 0
/commun/data/projects/folder1234/11X0311_recal.bam 1353534 1117463 788474 0
/commun/data/projects/folder1234/11X0456_pair1_unsorted.bam 1486490 1220969 848738 0
/commun/data/projects/folder1234/11X0247_pair1_unsorted.bam 1489832 1233208 862842 0
/commun/data/projects/folder1234/11X0542_pair1_unsorted.bam 1784860 1469183 1016314 0
/commun/data/projects/folder1234/XX07551_pair1_unsorted.bam 1778358 1456518 1018694 0
/commun/data/projects/folder1234/11X0433_pair1_sorted.bam 2178798 1800943 1249738 0
/commun/data/projects/folder1234/11X0433_markdup.bam 2178798 1800943 1249738 0
/commun/data/projects/folder1234/XX10272_recal.bam 1579608 1296449 876040 0
/commun/data/projects/folder1234/11X0433_realigned.bam 2178798 1800943 1249738 0
/commun/data/projects/folder1234/11X0240_recal.bam 1489672 1229654 830042 0
/commun/data/projects/folder1234/11X0290_recal.bam 1436970 1191044 837726 0
/commun/data/projects/folder1234/11X0456_recal.bam 1486490 1220969 848738 0
/commun/data/projects/folder1234/11X0247_recal.bam 1489832 1233208 862842 0
/commun/data/projects/folder1234/11X0433_pair1_unsorted.bam 2178798 1800943 1249738 0
/commun/data/projects/folder1234/11X0305_recal.bam 1818236 1489267 1027458 0
/commun/data/projects/folder1234/11X0542_recal.bam 1784860 1469183 1016314 0
/commun/data/projects/folder1234/11X0433_recal.bam 2178798 1800943 1249738 0
However I noticed seems that running this code on the cluster is slower that running it on the master node...

The C code

Note: the C code is compiled with ortecc, not gcc.
SAMDIR=/commun/data/packages/samtools-0.1.18
OPENMPI.dir=/commun/data/packages/openmpi
CC=$(OPENMPI.dir)/bin/ortecc
.PHONY: all
all: ./ompibam
./ompibam: ompibam.c
$(CC) -o $@ -O3 -I$(SAMDIR) \
-Wl,-rpath -Wl,$(OPENMPI.dir)/lib \
-L$(SAMDIR) -Wall $< -lmpi -lbam -lz
view raw Makefile hosted with ❤ by GitHub
/**
* Author : Pierre Lindenbaum PhD
* Date : 2013
* WWW: http://plindenbaum.blogspot.com
* Motivation:
* my first Open-MPI source. Takes a list of BAM files
* and count the number of reads.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "sam.h"
/** message send to the node: "count the read in a bam" */
#define TAG_COUNT_BAM 1
/** message send to the node: "shutdown" */
#define TAG_RELEASE_SLAVE 2
/**
* fixed-size structure send to the nodes. contains the BAMs and the counts of reads
*/
typedef struct Params_t
{
char filename[FILENAME_MAX];
int is_error;
long count_total;
long count_mapped;
long count_properly_paired;
long count_dup;
}Params;
#define IS_FLAG_SET(b,f) ((b->core.flag & (f))!=0)
/**
* count the reads in a BAM file using the SAMTOOLS API
* fills the structure Params*
*/
static void count_reads(Params* p)
{
int ret;
bam1_t* b;
bamFile fp=bam_open(p->filename, "r") ;
bam_header_t *header;
if(fp==0)
{
fprintf(stderr,"Cannot open %s\n",p->filename);
p->is_error=1;
return;
}
header= bam_header_read(fp);
b=bam_init1();
while ((ret = bam_read1(fp, b)) >= 0)
{
p->count_total++;
if(IS_FLAG_SET(b,BAM_FUNMAP)) continue;
p->count_mapped++;
if(IS_FLAG_SET(b,BAM_FPROPER_PAIR))
{
p->count_properly_paired++;
}
if(IS_FLAG_SET(b,BAM_FDUP))
{
p->count_dup++;
}
}
bam_destroy1(b);
bam_header_destroy(header);
bam_close(fp);
}
/**
* This is the slave method, it receive a message 'Params' from the master method
* if the status is TAG_RELEASE_SLAVE the slave is released
* else the bam in param.filename is analyszed and the message is sent back to the master
*/
static void slave()
{
for(;;)
{
MPI_Status status;
Params param;
/* Receive a message from the master */
MPI_Recv(
&param,
sizeof(Params),
MPI_CHAR,
0,
MPI_ANY_TAG,
MPI_COMM_WORLD,
&status);
if(status.MPI_TAG==TAG_RELEASE_SLAVE)
{
break;
}
count_reads(&param);
/* Send the result back */
MPI_Send(
&param,
sizeof(Params),
MPI_CHAR,
0,
0,
MPI_COMM_WORLD
);
}
}
/**
* this is the master method, while there is
* a bam in the queue, send the analysis to the slaves.
* At the end, close the slaves
*/
static void master(const int n_bams,char** argv)
{
int rank=0;
int optind=0;
int proc_count=0;
/* get number of available processor proc_count */
if(MPI_Comm_size(MPI_COMM_WORLD, &proc_count)!=MPI_SUCCESS )
{
return;
}
printf("#filename\ttotal\tmapped\tproperly_paired\tdup\n");
/* loop over the bam files */
while(optind< n_bams)
{
int i=0;
/* number of message sent */
int count_sent=0;
/* loop over the available procs */
for (rank = 1;
rank <proc_count && optind+count_sent < n_bams;
++rank)
{
/* create a new param */
Params param;
/** fill to '0' */
memset((void*)&param,0,sizeof(Params));
/* set the BAM filename */
strcpy(param.filename,argv[count_sent+optind]);
/* Send it to each rank */
MPI_Send(
&param, /* message buffer */
sizeof(Params), /* one data item */
MPI_CHAR, /* data item is an array of bytes */
rank, /* destination process rank */
TAG_COUNT_BAM, /* user chosen message tag */
MPI_COMM_WORLD /* default communicator */
);
++count_sent;
}
/* now we get the results */
for(i=0;i< count_sent;++i)
{
/* get a message from the slave */
Params param;
MPI_Status status;
MPI_Recv(&param, /* message buffer */
sizeof(Params), /* one data item */
MPI_CHAR, /* data item is an array of bytes */
MPI_ANY_SOURCE, /* receive from any sender */
MPI_ANY_TAG, /* any type of message */
MPI_COMM_WORLD, /* default communicator */
&status); /* info about the received message */
if(param.is_error!=0)
{
fprintf(stderr,"Error for %s\n",param.filename);
}
else /* print the result */
{
printf("%s\t%ld\t%ld\t%ld\t%ld\n",
param.filename,
param.count_total,
param.count_mapped,
param.count_properly_paired,
param.count_dup
);
}
}
if(count_sent==0)
{
break;
}
optind+=count_sent;
}
/* shutdown the slave */
for (rank = 1; rank < proc_count; ++rank)
{
Params empty;
MPI_Send(
&empty, /* message buffer */
sizeof(Params), /* one data item */
MPI_CHAR, /* data item is an array of bytes */
rank, /* destination process rank */
TAG_RELEASE_SLAVE, /* SHutdown */
MPI_COMM_WORLD
); /* default communicator */
}
}
int main(int argc, char *argv[])
{
int rank=0;
/* Initialisation */
MPI_Init(&argc, &argv);
/* Find out my identity in the default communicator */
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
/* rand is zero = I'm the master */
if (rank == 0)
{
master(argc-1,&argv[1]);
}
else /* I'm a slave */
{
slave(argc-1,&argv[1]);
}
/* Shut down MPI */
MPI_Finalize();
return EXIT_SUCCESS;
}
view raw ompibam.c hosted with ❤ by GitHub


I'm still very new to this API and to those concepts. Feel free to suggest anything :-)
That's it,
Pierre

1 comment:

Avi Ramu said...

Hi Pierre,
Thanks a ton for this post, I'm going to try this out on my cluster here. I have a tool which iterates through a bcf/vcf file and computes a statistic at each locus. I'll let you know if I can parallelize that using OpenMP !
Thanks again,
Avinash