\subsection{Message Passing Interface (\textsc{MPI})} \subsubsection{Allgemeines} \begin{itemize} \item Message-Passing--Programme: \begin{itemize} \item in sequentieller Sprache geschrieben \item Variablen sind prozesslokal \item Start auf vielen Prozessoren/Knoten \item unterschiedliches Verhalten abhängig von der Prozessnummer \item Kommunikation durch explizite, zweiseitige Bibliotheksaufrufe (etwa MPI) \end{itemize} \item MPI ist ein Standard für C und Fortran, der Funktionssignaturen und Semantik festlegt \item verschiedene Implementierungen wie \textsc{OpenMPI} oder \textsc{Intel MPI} \end{itemize} \subsubsection{Operationen} \begin{itemize} \item Eigenschaften einer Nachricht: \begin{itemize} \item Senderkennung \item Adresse der Quelldaten \item Datentyp (primitiv, zusammengesetzt, etc.) \item Anzahl der Datenelement jeweils bei Sender und Empfänger \item Empfängerkennung (einer oder mehrere) \item Adresse der Zieldaten \end{itemize} \item blockierend: alle Puffer der Operation können direkt nach Rückkehr des Aufrufs wiederverwendet werden \item nicht-blockierend: Puffer erst nach Abschluss der Operation wieder verwendbar \item lokal: kehrt immer zurück, unabhängig von Aktionen anderer Prozesse \item nicht-lokal: kehrt erst zurück, wenn in anderem Prozess bestimmte Bedingung auftritt \item \lstinline|MPI_Init()|: Initialisieren der MPI-Strukturen \item \lstinline|MPI_Finalize()|: Abschluss aller MPI-bezogenen Operationen und Freigabe der MPI-Strukturen \item \lstinline|MPI_Comm_rank(MPI_Comm comm, int *rank)|: Bestimmung des Rangs im übergebenen Kommunikator \item \lstinline|MPI_Comm_size(MPI_Comm comm, int *size)|: Bestimmung des Größe des übergebenen Kommunikators \item \lstinline|MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)|: Senden einer Nachricht (Verhalten implementierungsabhängig) \item \lstinline|MPI_Ssend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)|: synchrones Senden (nicht-lokal, blockierend), kehrt nach Empfangsaufruf in anderem Prozess zurück \begin{itemize} \item Übertragung größerer Datenmengen unterschiedlicher Größen \end{itemize} \item \lstinline|MPI_Bsend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)|: gepuffertes Senden (lokal, blockierend), kehrt nach Kopieren der Nachricht zurück: \begin{itemize} \item Vermeidung von Blockierungsgefahr \item Überlappung von Berechnungen mit Kommunikation (aber dafür öfter Kopieren!) \item Bereitstellung ausreichend großer Puffer am Programmanfang notwendig mittels \lstinline|MPI_Buffer_attach(void *buffer, size_t size)| und \lstinline|MPI_Pack_size(int count, MPI_Datatype type, MPI_Communicator comm, int *size)| \end{itemize} \item \lstinline|MPI_Rsend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)|: sofortiges Senden, erwartet dass Empfangsauftrag bereits abgesetzt wurde: \begin{itemize} \item Versenden vieler kleiner Nachrichten \item Überlappung Kommunikation mit Berechnung nicht gewünscht \item garantierter Empfangsauftrag notwendig (z.B. am Anfang mit \texttt{MPI\_Irecv} bereitstellen) \end{itemize} \item \lstinline|MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status status)|: Empfang \item \lstinline|MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status status)|:\\ gleichzeitiges Senden und Empfangen \item gleichbleibende Operationen (Typ, Größe, Tag, Empfänger, Puffer, \textellipsis): \begin{itemize} \item \lstinline|MPI_Send_Init(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm, MPI_Request *request)|: Anlegen des Sendeauftrags \item \lstinline|MPI_Start(MPI_Request *request)|: Starten des Sendeauftrags \item \lstinline|MPI_Request_free(MPI_Request *request)|: Freigeben des Sendeauftrags \end{itemize} angelegt und durch \item nicht-blockierende Varianten \lstinline|MPI_I...| (lokal) geben \lstinline|MPI_Request| zurück \begin{itemize} \item Überlagerung von Berechnungen mit Kommunikation möglich \item gleichzeitiges Empfangen von verschiedenen Nachrichten möglich \end{itemize} \item \lstinline|MPI_Wait(MPI_Request *request, int *flag, MPI_Status *status)|: blockierendes Warten auf Ende des übergebenen Requests \item \lstinline|MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)|: Testen auf Ende des übergebenen Requests \item \lstinline|MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status *status)|: blockierender Empfangstest \item \lstinline|MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count)|: Länge der zu empfangenen Nachricht (kann zur Allokation von Puffern genutzt werden) \end{itemize} \subsubsection{Kollektive Operationen} \begin{itemize} \item müssen von allen Prozessen des Kommunikators in der gleichen Reihenfolge aufgerufen werden \item Synchronisation durch \lstinline|MPI_Barrier(MPI_Comm comm)|: Fortsetzung erst dann, wenn alle Prozesse Aufruf getätigt haben \item Kommunikation: \begin{itemize} \item \lstinline|MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)|: Versenden bzw. Empfangen einer Nachricht von einem Knoten an alle anderen \item \lstinline|MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)|: elementweises Verteilen eines Arrays an alle Prozesse des Kommunikators \item \lstinline|MPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)|: Aufsammeln von Datenelementen von allen Prozessen und Speicherung als Array in einem Prozess \item \lstinline|MPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)|: Aufsammeln von Datenelementen von allen Prozessen und Speicherung als Array in jedem Prozess (Transponieren der Arrays über die Nodes) \end{itemize} \item Reduktion: \begin{itemize} \item \lstinline|MPI_Reduce(void *data, void *result, int count, MPI_Datatype datatype, MPI_Op operator, int root, MPI_Comm comm)|: Globale Reduktion auf verteilten Daten \item \lstinline|MPI_Allreduce(void *data, void *result, int count, MPI_Datatype datatype, MPI_Op operator, MPI_Comm comm)|: Globale Reduktion auf verteilten Daten mit Ergebnis auf jedem Knoten \item \lstinline|MPI_Scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op operator, MPI_Comm comm)|: Globale Reduktion, jeder Node erhält Ergebnis bis einschließlich sich selbst \end{itemize} \end{itemize} \subsubsection{Kommunikatoren} \begin{itemize} \item bestehen aus \begriff{Prozessgruppe} und \begriff{Nachrichtenkontext} \item erlauben die Kommunikation von Bibliotheken, ohne die des eigentlichen Programms zu beeinflussen \item \lstinline|MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newcomm)|: Erzeugung eines neuen Kommunikators aus einer Gruppe \item \lstinline|MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm)|: Erzeugung neuer Kommunikatoren durch Gruppierung nach \texttt{color}, Rangbestimmung durch Sortieren von \texttt{key} \item Mengenoperationen auf Gruppen \item virtuelle Topologien: Abbildung von Prozessnummern auf einen Namensraum: \begin{itemize} \item Teil des Nachrichtenkontexts \item Graph:\begin{itemize} \item \lstinline|MPI_Graph_create(MPI_Comm comm, int nnodes, int index[], int edges[], int reorder, MPI_Comm *newcomm)|: Erstellung neues Kommunikators aus Kanten (\texttt{index} gibt pro Knoten die erste Kante in \texttt{edges} an) \item \lstinline|MPI_Graph_neighbors_count(MPI_Comm comm, int rank, int *count)|: Bestimmung der Anzahl der Nachbarn von \texttt{rank} \item \lstinline|MPI_Graph_neighbors(MPI_Comm comm, int rank, int maxneighbors, int *neighbors)|: Bestimme Ränge der Nachbarn von \texttt{rank} \end{itemize} \item kartesische Koordinaten:\begin{itemize} \item \lstinline|MPI_Cart_create(MPI_Comm comm, int ndims, int dims[], int periods[], int reorder, MPI_Comm *newcomm)|: Erzeugung eines \texttt{ndims}-dimensionalen Gitters \item \lstinline|MPI_Cart_coords(MPI_Comm comm, int rank, int ndims, int coords[])|: Abbildung von Rang auf Koordinaten \item \lstinline|MPI_Cart_rank(MPI_Comm comm, int coords[], int *rank)|: Abbildung von Koordinaten auf Rang \item \lstinline|MPI_Cart_shift(MPI_Comm comm, int direction, int disp, int *rank_src, int *rank_dest)|: Ermittlung der Ränge benachbarter Knoten im Gitter \end{itemize} \end{itemize} \end{itemize} \subsubsection{Datentypen} \begin{itemize} \item primitive Datentypen für Ganz- und Fließkommazahlen \item abgeleitete Datentypen:\begin{itemize} \item allgemein: $((\mathrm{type}_0, \mathrm{disp}_0), \dots, (\mathrm{type}_{n-1}, \mathrm{disp}_{n-1}))$ \item \texttt{struct}: \lstinline|MPI_Type_struct(int count, int blocklengths[], MPI_Aint displacements[], MPI_Datatype types[], MPI_Datatype *newtype)| \item zusammenhängender Speicher: \lstinline|MPI_Type_contiguous(int count, MPI_Datatype oldtype, MPI_Datatype *newtype)| \item Vektor: \lstinline|MPI_Type_vector(int count, int blocklength, int stride, MPI_Datatype oldtype, MPI_Datatype *newtype)| \item \lstinline|MPI_Type_commit(MPI_Datatype *type)|: Festlegen des Datentyps \end{itemize} \end{itemize} \subsubsection{MPI-2} \begin{itemize} \item dynamisches Starten von Prozessen \item Socket-Programmierung \item nicht-blockierende kollektive Operationen \item Remote Memory Access mit Speicherfenstern: \begin{itemize} \item \lstinline|MPI_Win_create (void *base, int size, int disp_unit, MPI_Info info, MPI_Comm comm MPI_Win *win)|: kollektives Erstellen eines Fensters \item \lstinline|MPI_Put(void *buffer, int or_count, MPI_Datatype or_type, int ta_rank, MPI_Aint ta_disp, int ta_count, MPI_Datatype ta_type, MPI_Win win)|: Schreiben in entferntes Speicherfenster \item \lstinline|MPI_Win_fende(MPI_Win win)|: Synchronisation bezüglich des Fensters \item \lstinline|MPI_Accumulate(void *buffer, int or_count, MPI_Datatype or_type, int ta_rank, MPI_Aint ta_disp, int ta_count, MPI_Datatype ta_type, MPI_Op op, MPI_Win win)|: wie \texttt{MPI\_Reduce} auf entfernten Speicherfenstern \end{itemize} \item Multi-Threading-Unterstützung mit \lstinline|MPI_Init_thread(int **argc, char ***argv, int required, int *provided)|:\begin{itemize} \item \texttt{MPI\_THREAD\_SINGLE}: es existiert pro Prozess nur ein einzelner Thread \item \texttt{MPI\_THREAD\_FUNNELED}: mehrere Threads können pro Prozess existieren, aber nur einer tätigt MPI-Aufrufe \item \texttt{MPI\_THREAD\_SERIALIZED}: mehrere Threads können pro Prozess existieren, aber nur einer tätig MPI-Aufrufe gleichzeitig \item \texttt{MPI\_THREAD\_MULTIPLE}: mehrere Threads können gleichzeitig MPI-Aufrufe tätigen \end{itemize} \end{itemize} \subsubsection{Implementierung} \begin{itemize} \item Kommunikation über UNIX-Sockets (aber relativ teuer; besser: Userspace-Kommunikation) \item Verbindungsaufbau vollvermaschtes Netz von $n$ Nodes: Nodes $i+1$ bis $n$ verbinden sich mit Knoten $i$ \item Header notwendig für Filterung und Allokation von Empfangspuffer: \begin{lstlisting} struct header { int num_bytes; int from; int to; int tag; char buffer[PAYLOAD_SMALL_SIZE]; } \end{lstlisting} \item Versenden von Header und Payload mit \lstinline|int writev(int fd, iovec *iov, int iovcnt)|/\lstinline|int readv(int fd, iovec *iov, int iovcnt)| \item Empfangen von Nachrichten von mehreren Hosts gleichzeitig: \texttt{select}/\texttt{poll}/\texttt{epoll} \item Sperren mit Mutexen für Multithreading-Unterstützung beim Senden \item Synchronisierte Warteschlange für Multithreading-Unterstützung beim Empfangen \item zusätzliche Operation \texttt{probe} blockiert, bis Paket vorliegt \item Vermeidung von Kopieren großer Payloads durch Warten bis Empfänger bereit ist, dann ist keine Pufferung auf Empfängerseite notwendig: \texttt{sync\_send} \item Implementierung von Multi- und Broadcast durch Gruppen in IP-Netzen (Verwaltung durch IGMP) bei UDP \item kleine Payloads direkt in Header integrierbar \item Verwendung alternativer Protokolle statt TCP:\begin{itemize} \item UDP bringt performance-technisch Nachteile \item SCTP ab geringen Paketverlusten besser \end{itemize} \item optimierte Topologie für kollektive Operationen ausnutzen: \begin{itemize} \item jeder Knoten baut eigenen minimalen Spannbaum auf \item ringförmiges Weiterleiten der Puffer \item Broadcasts nur an Subnetze sowie einen Host in anderen Subnetzen für Weiterleitung \end{itemize} \end{itemize}