Reading Notes: MPI and Message Passing Concurrency
I needed to use mpi for a school assignment (that’s way overdue), but I’ve only been able to work on it in bursts because things steal my attention now during the holiday season, so I need to write everything down or I’ll forget.
The things I write here may not be universally true, but it’s true for the way I’m going to use Openmpi. If you want more, this set of very short tutorials is one of the best resources I’ve found. I initially dismissed them because I misunderstood what they were, but they’re a great reference.
Basic Concepts
It is important to note that mpi is not an implementation – it is a low-level interface designed to build libraries upon.11 The raison d’être, the reason it exists, is that it is expensive to develop supercomputing libraries tailored to each specific high-performance number-crunching cluster system. Instead, libraries are built on mpi, which is then implemented on the high-performance clusters. So mpi is sandwhiched between domain-specific libraries and technology-specific cluster implementations. That said, you can use mpi directly, and it’s not too inconvenient to do so. There are two major architectural patterns supported by mpi, as far as I have been able to tell:
- Hierarchical
- This is where you have one root process (often with rank 0) coordinating the work of several worker processes.
- Decentralised
- This is when all processes are independent and perform the same computations, except with some clever coordination between them. This is the pattern I think was a best fit for my assignment.
Then the mpi runtime (mpiexec
) launches several identical instances of your
program, and as the code runs, the instances will eventually execute the same
call to, e.g. MPI_Allgather
. When they do, the mpi implementation does some
magic behind the scences and then when the execution moves on, all instances
have a copy of the data on all other instances. Whenever one instance crashes,
it brings all other instances down with it. So while instances are distinct,
they operate as one unit.
To ensure necessary code is run, mpi requires initialisation (with
MPI_Init(&argc, &argv)
) and finalisation (with MPI_Finalize()
).
I should also say that mpi processes are split into indepedent communication
units. So a program consists of communication units, which consists of
processes22 The code can then specify an informal hierarchy over these
processes, but that’s not required or necessarily desired.. The default unit
which all new processes are members of is called MPI_COMM_WORLD
.
The number of processes in a unit can be read with MPI_Comm_size(comm,
&size)
33 Here is where you may, for example, want comm
to be
MPI_COMM_WORLD
.. The communication unit-unique id of the current
process44 A process can be a member of multiple communication units, in which
case the id number may be different in some or all of them. is called its
rank and can be read with with MPI_Comm_rank(comm, &rank)
.
Basic Operations
MPI_Send(start, count, type, rank, tag, comm)
- Send message (blocking operation), where
start
is a pointer to a buffercount
is how many elements to send from buffertype
is data type to read, which can be, e.g.,- Primitive types like
MPI_INT
orMPI_DOUBLE_PRECISION
- An array of primitive types
- Custom datatypes (e.g. array of pairs), definable with mpi functions
- Primitive types like
rank
andcomm
identify recipienttag
allows the recipient to listen for particular types of message
- Send message (blocking operation), where
MPI_Recv(start, count, type, rank, tag, comm, status)
- Receive mesage (blocking) with
MPI_Recv
, same arguments asMPI_Send
with one addition:status
is a pointer to aMPI_Status
object, which will contain stuff like who sent the message and what it was tagged with.count
specifies how many we will accept, not how many we expect!rank
can be ignored by asking for messages fromMPI_ANY_SOURCE
tag
can be ignored by asking for messages taggedMPI_ANY_TAG
- Receive mesage (blocking) with
MPI_Barrier(comm)
- When a process in a communication unit reaches the barrier, it blocks until all other processes in the same communication unit have called the barrier procedure. This can be used to create critical sections55 I used this to make i/o thread safe. by looping through all rank numbers, having a barrier at the end of each iteration, and only letting the process whose rank matches the current iteration perform the critical operation.
Combined Operations
The mpi interface also specifies various abstractions built upon the basic send/receive. These have been useful for my assignment:
MPI_Bcast(…)
- Sends a value to all processes. You specify the rank of a root process and a buffer. The value in the buffer of the root process gets filled into the buffers of the other processes in the same communication unit.
MPI_Scatter(…)
- If a root possesses a collection of values, which can be split into
sub-collections and processed individually, calling
MPI_Scatter
with the root process specified will divide the collection amoung the available processes in the communcation unit.
- If a root possesses a collection of values, which can be split into
sub-collections and processed individually, calling
MPI_Gather
- In the opposite scenario, assemble sub-collections into a large collection
in the root process.
MPI_Allgather
- does the same as the above but will assemble the large collection in all processes, not just the root process. Imagine it like gathering to the root process and then broadcasting the result to all other processes.
MPI_Reduce
- If, on the other hand, you want to combine the elements of the
collection66 Where “combine” can be e.g. to compute a total sum of the
elements, or a logical and of all elements. and that’s why you’re
thinking about gathering them to a root process, you can instead use
MPI_Reduce
which will do this combining operation efficiently. MPI_Allreduce
If you want the result of this combination operation to be available to all processes,
MPI_Allreduce
is what you want. Again, you can imagine it like combining into a root process, and then broadcasting it to all other processes.In the end, I used
MPI_Allreduce
almost exclusively for my assignment, because it was part of the assignment that each process should individually determine the result, which meant I always wanted all results to be shared.
- If, on the other hand, you want to combine the elements of the
collection66 Where “combine” can be e.g. to compute a total sum of the
elements, or a logical and of all elements. and that’s why you’re
thinking about gathering them to a root process, you can instead use
- In the opposite scenario, assemble sub-collections into a large collection
in the root process.
Caveats
One problem I ran into several times when writing mpi code was asymmetric
execution of processes. What I mean is that each mpi process should,
essentially, execute the exact same code. The reason for this is that if not all
processes in a communication unit call, e.g., MPI_Allreduce
, the other
processes will hang waiting to rendezvous with the other processes.
If no condition in the code depends on the rank of the running process, this will not be a problem. However, sometimes you need to branch based on rank. When you do, you need to be very careful that all code paths call the same mpi procedures.