OpenMPI/Rmpi

Sul cluster è istallata le libreria OpenMPI per gestire la distribuzione di processi sul cluster. Rmpi è la libreria di R che si interfaccia con OpenMPI per sfruttarne i servizi.

Per meglio sfruttare le capacità di parallelizzazione dei processori più moderni OpenMPI è stato compilato con delle opzioni di ottimizzazione più stringenti; ne consegue che il software che coinvolge questa libreria non può essere eseguito sui nodi più vecchi. Bisogna quindi specificare nella direttiva PBS il tipo di processori che si desiderano aggiungendo l’apposito tag e52630.
Es:

#PBS -l nodes=8:e52630:ppn=2

Rmpi è attualmente compilato con OpenMPI 3.1.3 e, per poterlo utilizzare bisogna caricarne il modulo.

Esecuzione di un job in OpenMPI

Il modo più semplice di utilizzare OpenMPI è eseguire lo stesso job su più nodi sfruttando il runner della libreria.

In questo caso non si fa ricorso al binder in R (Rmpi) ed è applicabile quindi a qualsiasi eseguibile. Sfruttando informazioni dell’environment o utilizzando le molte opzioni di mpiexec/mpirun è possibile gestire i parametri passati alle varie istanze del processo e rendere questo esempio utile in pratica.

Vediamo l’esempio di uno script Python:

#!/bin/bash

#PBS -N Es1OpenMPI

#PBS -l nodes=3:e53630:ppn=2

#PBS -l walltime=00:01:00

#PBS -m abe -M nome.cognome@uniroma1.it

#PBS -j oe

cd $PBS_O_WORKDIR

module load python

module load openmpi

mpiexec --hostfile $PBS_NODEFILE python3 host.py

In cui, in particolare, viene specificato la tipologia dei nodi (e53630) e il job viene eseguito attraverso mpiexec. Una volta caricato il modulo man mpiexec permetterà di conoscere le varie opzioni del comando.

La variabile di ambiente PBS_NODEFILE contiene il path di un file in cui Torque scrive l'elenco dei nodi in cui il job corrente ha slot assegnati, ed è necessario ad mpiexec per coordinarsi con Torque. Viene generato automaticamente e non è necessario fare altro che passarne il path all’ eseguibile, come nell’esempio.

Se lo script host.py contiene

import socket

print(socket.gethostname())

si avrà in output l'elenco dei nodi in cui viene eseguito. Nello specifico, avendo richiesto due processi (ppn=2) per ogni nodo, ogni host apparirà due volte.

Stesso risultato si otterrebbe con uno script Bash contenente:

hostname

o con uno script R contenente la riga

paste( Sys.info()[c("nodename")])

In quest'ultimo caso la riga di esecuzione del job diventa

mpiexec --hostfile $PBS_NODEFILE R --no-save -f host.R

Utilizzo di Rmpi per esecuzione di processi worker

In questo esempio vedremo come, attraverso OpenMpi ed Rmpi è possibile delegare l’esecuzione di parte dei calcoli a dei processi worker attivati sul cluster.

Per questo esempio cambiamo la meccanica di esecuzione; mentre nell’esempio precedente abbiamo delegato ad mpirun l’esecuzione di processi multipli, in questo caso utilizzeremo mpirun per eseguire un processo padre in ambiente OpenMPI e sfrutteremo la libreria Rmpi per controllare i processi worker. Ora, come nell’esempio precedente dovremo comunque dichiarare a Torque il numero di processi che andremo ad eseguire per permettergli di riservare le risorse necessarie.

Il nuovo script di descrizione del job è:

#!/bin/sh

#PBS -l nodes=4:e52630:ppn=4

#PBS -l walltime=00:10:00

#PBS -m abe -M nome.cognome@uniroma1.it

#PBS -j oe

#PBS -N testRmpi

module load R/3.4.1

module load openmpi/3.1.3

mpirun --hostfile ${PBS_NODEFILE} -np 1 R --no-save -f bench_integral.R

notare che mpirun si specifica che inizialmente si lancia un solo processo (-np 1).

Come job di esempio andiamo ad eseguire in parallelo l'integrazione numerica di una funzione variando il numero di worker e confrontando i tempi di esecuzione.

library("Rmpi")

nStep=1000000000

startX=1

stopX=13

toBeIntegrated <- function (x) {

   1/sqrt(x)

}

integral <- function(indice,startx,stopx,nStep,numSlaves,fun){

   intervallo=(stopx-startx)/numSlaves

   start=startx+(indice-1)*intervallo

   stop=startx+indice*intervallo

   step=(stopx-startx)/nStep*numSlaves

   sum(fun(seq(from=start, to=stop, by=step))*step)

}

for( numSlaves in seq(1,8) )

{

   if( numSlaves == 1 ){

       startTime=Sys.time()

       result=integral(1,startX,stopX,nStep,1,toBeIntegrated)

       stopTime=Sys.time()

       misure=data.frame("Number Slaves"=1,"Execution Time"=stopTime-startTime,       
        "Result"=result)

   } else {

       mpi.spawn.Rslaves(nslaves=numSlaves)

       startTime=Sys.time()

       result=0

       results <- mpi.iapplyLB(1:numSlaves, integral, startX, stopX, nStep,

numSlaves, toBeIntegrated)

       for( ele in results ) result = result+ ele

       stopTime=Sys.time()

       misure <- rbind(misure,data.frame("Number Slaves"=numSlaves,

"Execution Time" = difftime(stopTime, startTime,

units='secs'),"Result"=result))

       mpi.close.Rslaves()

   }

}

misure

mpi.finalize()

Ai fini della parallelizzazione del calcolo i punti importanti sono il caricamento della libreria Rmpi

library("Rmpi")

l’avvio dei processi che dovranno eseguire le operazioni

mpi.spawn.Rslaves(nslaves=numSlaves)

Per confrontare l’effetto dell’esecuzione di un numero diverso di worker nella descrizione del job abbiamo riservato risorse sufficienti al massimo necessario. Dato che le risorse riservate non sono disponibili per altri utenti è opportuno riservare quelle realmente sfruttate.

La funzione mpi.iapplyLB per distribuire il carico sui vari processi slave

results <- mpi.iapplyLB(1:numSlaves, integral, startX, stopX, nStep, numSlaves, toBeIntegrated)

le operazioni di chiusura dell'ambiente mpi

mpi.close.Rslaves()

mpi.finalize()

di cui, in un job reale, sarebbe opportuno garantirsi l’esecuzione, ad esempio tramite la funzione .Last.

Altro esempio per Rmpi

In questo esempio vengono mostrate gli elementi più interessanti messi a disposizione dalla lbreria Rmpi.

library(Rmpi)

nslaves = mpi.universe.size() - 1  # leftout at least a CPU for the master process

print(nslaves)

mpi.spawn.Rslaves(nslaves=nslaves)

# utility functions

mpi.is.master()  # Am I the master?

mpi.hostinfo()  # returns host name, rank, size and communicator number for the current CPU

slave.hostinfo()  # as mpi.hostinfo() but for all master and slaves

mpi.setup.rngstream(123)  # set the RNG with the specified seed on all the slaves

# There exist several parallel versions of the R function 'apply':

# mpi.parApply, applies a function in parallel. MARGIN can be 1 (rows), 2 (cols) or c(1,2) (both)

# mpi.parLapply, parallel lapply function, returns a list

# mpi.parSapply, parallel sapply function, returns a simplified result

# mpi.parRapply, parApply with MARGIN=1

# mpi.parCapply, parApply with MARGIN=2

# mpi.parReplicate, parallel replicate function

# mpi.parMM, parallel matrix product

mpi.parRapply(matrix(c(1,2,3,4,5,6,7,8,9), nrow=3, byrow=T), mean)

# these functions have also a non-blocking version (eg. mpi.iparApply, mpi.iparLapply, etc..), with which the master process can continue the computations while the slaves work

mpi.parSim(n=10, rand.gen=rnorm, statistic=mean, nsim=10) # parallel Monte Carlo Simulation

# if the statistic returns a single value, the result returns a vector of size: N=nslaves * sim, while if the value is a vector of length l, the result is a matrix of size (l, N)

mpi.remote.exec(rnorm, 5)  # execute a command on R slaves

xx = 1:10

# we can move a general R object from the master to the slaves, so that they can use it for their computations, with the following function

mpi.bcast.Robj2slave(xx)

mean2 = function(x) {

 sum(x) / length(x)

}

# if you want to move only all the master's functions to the slaves, use the following

mpi.bcast.Rfun2slave()

mpi.parReplicate(10, mean2(xx))

mpi.close.Rslaves()

mpi.finalize()

# MPI session terminated, it is possible continuing with other tasks