-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathparallel-processing.tex
1007 lines (835 loc) · 37.4 KB
/
parallel-processing.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
\chapter[Parallel Processing]{Parallel Processing \label{chap:parallel}}
%\setcounter{page}{1}
\renewcommand{\thefootnote}{\fnsymbol{footnote}}
\footnotetext{Lecture notes by John Schneider. {\tt
parallel-processing.tex}}
The FDTD method is said to be ``trivially parallelizable,'' meaning
that there are several simple ways in which the algorithm can be
divided into tasks that can be executed simultaneously. For example,
in a 3D simulation one might write an FDTD program that simultaneously
updates the $E_x$, $E_y$, and $E_z$ components of the electric field.
These updates depend on the magnetic field and previous values of
themselves---they are not a function of each other and hence can be
updated in parallel. Then, $H_x$, $H_y$, and $H_z$ might be updated
simultaneously. Alternatively, one might divide the computational
domain into distinct, non-overlapping regions and assign different
processors to update the fields in those regions. This way fields in
each of the regions could be updated simultaneously.
Here we will example two approaches to parallelizing a program. The
threading approach typically use one computer to run a program. A
threaded program is designed in such a way as to split the total
computation between two or more ``threads.'' If the computer has
multiple processors, these threads can be executed simultaneously.
Each of the threads can share the same memory space, i.e., the same
set of variables. An alternative approach to parallelization uses the
Message Passing Interface (MPI) protocol. This protocol allows
different computers to run programs that pass information back and
forth. MPI is ideally suited to partitioning the computational domain
into multiple non-overlapping regions. Different computers are used
to update the fields in the different regions. To update the fields
on the interfaces with different regions, the computers have to pass
information back and forth about the tangential fields along those
interfaces.
In this chapter we provide some simple examples illustrating the use
of threads and MPI.
\section{Threads}
There are different threading packages available. Perhaps the most
common is the POSIX threads (pthreads) package. To use pthreads, you
must include the header file {\tt pthread.h} in your program. When
linking, you must link to the pthread library (which is accomplished
with the compiler flag {\tt -lpthread}).
There are many functions related to pthreads. On a UNIX-based system
on which pthreads are installed, a list of these functions can
typically be obtained with the command {\tt man -k pthread} and then
one can see the individual man-pages to obtain details about a
specific function.
Despite all these functions, it is possible to do a great deal of
useful programming using only two functions: {\tt pthread\_create()}
and {\tt pthread\_join()}. As the name implies, a thread is created
by the function {\tt pthread\_create()}. You can think of a thread as
a separate process that happens to share all the variables and memory
with the rest of your program. One of the arguments of {\tt
pthread\_create()} will specify what this thread should do,
specifically what function it should run.
The prototype of {\tt pthread\_create()} is:
\begin{lstlisting}
int pthread_create(
pthread_t *thread_id, // ID number for thread
const pthread_attr_t *attr, // controls thread attributes
void *(*function)(void *), // function to be executed
void *arg // argument of function
);
\end{lstlisting}
The first argument is a pointer to the thread identifier (which is
simply a number but we do not actually care about the details of how
the ID is specified). This ID is set by {\tt pthread\_create()},
i.e., one would typically be interested in the returned value---it is
not something that is set prior to {\tt pthread\_create()} being
called.
The second argument is a pointer to a variable that controls the
attributes of the thread. In this case, the value of this variable is
established prior to the call of the function. This pointer can be
set to {\tt NULL} in which case the thread is created with the default
attributes. Attributes control things like the ``joinability'' of the
thread and the scheduling of threads. Typically one can simply use
the default settings. The {\tt pthread\_t} and {\tt pthread\_attr\_t}
data types are defined in {\tt pthread.h}.
The third and fourth arguments specify what function the thread should
call and what argument should be passed to the function. Notice that
the prototype says the function takes a void pointer as an argument
and returns a void pointer. Keep in mind that ``void'' pointers are,
in fact, simply generic pointers to memory. We can typecast these
pointers to what they actually are. Thus, in practice, it would be
perfectly acceptable for the function to take, for example, an
argument of a pointer to a structure and return a pointer to a double.
One would merely have to do the appropriate typecasting. If the
function does not take an argument, the fourth argument of {\tt
pthread\_create()} is set to {\tt NULL}.
Once a new thread is created using {\tt pthread\_create()}, the
program continues execution at the next command---the program does not
wait for the thread to complete whatever the thread has been assigned
to do. The function {\tt pthread\_join()} is used to block further
execution of commands until the specified thread has completed. {\tt
pthread\_join()} can also be used to access the return-value of the
function that was run in a thread. The prototype of {\tt
pthread\_join()} is:
\begin{lstlisting}
int pthread_join(
pthread_t thread_id, // ID of thread to "join"
void **value_pntr // address of function's return value
);
\end{lstlisting}
\section{Thread Examples}
To demonstrate the use of pthreads, let us first consider a standard
serial implementation of a program where first one function is called
and then another is called. The program is shown in Program
\ref{pro:serialExamp}.
\begin{program}
{\tt serial-example.c}: Standard serial implementation of a program
where first one function is called and then another. (These function
are merely intended to perform a lengthy calculation. They do not do
anything particularly useful.) \label{pro:serialExamp}
\codemiddle
\begin{lstlisting}
/* serial (i.e., non-threaded) implementation */
#include <stdio.h>
void func1();
void func2();
double a, b; // global variables
int main() {
func1(); // call first function
func2(); // call second function
printf("a: %f\n", a);
printf("b: %f\n", b);
return 0;
}
/* do some lengthy calculation which sets the value of the the global
variable "a"*/
void func1() {
int i, j;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
a = 3.1456*j+i;
return;
}
/* do another lengthy calculation which happens to be the same as
done by func1() except here the value of global variable "b" is set
*/
void func2() {
int i, j;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
b = 3.1456*j+i;
return;
}
\end{lstlisting}
\end{program}
In this program the functions {\tt func1()} and {\tt func2()} do not
take any arguments nor do they explicitly return any values. Instead,
the global variables {\tt a} and {\tt b} are used to communicate
values back to the main function. Neither {\tt func1()} nor {\tt
func2()} are intended to do anything useful. There are merely used to
perform some lengthy calculation. Assuming the executable version of
this program is named {\tt serial-example}, the execution time can
be obtained, on a typical UNIX-based system, by issuing the command
``{\tt time serial-example}''.
Now, let us consider a threaded implementation of this same program.
The appropriate code is shown in Program \ref{pro:threadsExamp1}.
\begin{program}
{\tt threads-example1.c}: A threaded implementation of the program
shown in Program \ref{pro:serialExamp}.
\label{pro:threadsExamp1}
\codemiddle
\begin{lstlisting}
/* threaded implementation */
#include <stdio.h>
#include <pthread.h>
void *func1();
void *func2();
double a, b;
int main() {
pthread_t thread1, thread2; // ID's for threads
/* create threads which run in parallel -- one for each function */
pthread_create(&thread1, NULL, func1, NULL);
pthread_create(&thread2, NULL, func2, NULL);
/* wait for first thread to complete */
pthread_join(thread1,NULL);
printf("a: %f\n", a);
/* wait for second thread to complete */
pthread_join(thread2,NULL);
printf("b: %f\n", b);
return 0;
}
void *func1() {
int i, j;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
a = 3.1456*j+i;
return NULL;
}
void *func2() {
int i, j;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
b = 3.1456*j+i;
return NULL;
}
\end{lstlisting}
\end{program}
In Program \ref{pro:threadsExamp1} {\tt func1()} and {\tt func2()} are
slightly different from the functions of the same name used in
\ref{pro:serialExamp}. In both these programs these functions
perform the same calculations, but in \ref{pro:serialExamp} these
functions returned nothing. However {\tt pthread\_create()} assumes
the function returns a void pointer (i.e., a generic pointer to
memory). Since in this example these functions do not need to return
anything, they simply return {\tt NULL} (which is effectively zero).
If Program \ref{pro:threadsExamp1} is run on a computer that has two
(or more) processors, one should observe that the execution time (as
measured by a ``wall clock'') is about half of what it was for Program
\ref{pro:threadsExamp1}. Again, assuming the executable version of
Program \ref{pro:threadsExamp1} is named {\tt threads-example1}, the
execution time can be obtained by issuing the command ``{\tt time
threads-example1}''. This timing command will typically return three
values: the ``wall-clock'' time (the actual time that elapsed from the
start to the completion of the program), the CPU time (the sum of time
spent by all processors used to run the program), and system time
(time used by the operating system to run things necessary for your
program to run, but not directly associated with your program). You
should observe that ultimately nearly the same amount of CPU time was
used by both the threaded and serial programs but the threaded program
required about half the wall-clock time. In the case of the second
program two processors were working simultaneously and hence the
wall-clock time was half as much, or nearly so. In fact, there is
slightly more computation involved in the threaded program than the
serial program since there is some computational overhead associated
with the threads.
Let us now modify the first function so that it returns a value,
specifically a pointer to a double where we simply store an arbitrary
number (in this case $10.0$). The appropriate code is shown in
Program \ref{pro:threadsExamp2}.
\begin{program}
{\tt threads-example2.c}: Modified version of Program
\ref{pro:threadsExamp1} where now {\tt func1()} has a return value.
\label{pro:threadsExamp2}
\codemiddle
\begin{lstlisting}
/* threaded implementation -- returning a value */
#include <stdio.h>
#include <stdlib.h> // needed for malloc()
#include <pthread.h>
double *func1(); // now returns a pointer to a double
void *func2();
double a, b;
int main() {
double *c; // used for return value from func1
pthread_t thread1, thread2; // ID's for threads
// typecast the return value of func1 to a void pointer
pthread_create(&thread1, NULL, (void *)func1, NULL);
pthread_create(&thread2, NULL, func2, NULL);
// typecast the address of c to a void pointer to a pointer
pthread_join(thread1,(void **)&c); /*@ \label{threadExample2A} @*/
printf("a,c: %f %f\n", a, *c);
pthread_join(thread2,NULL);
printf("b: %f\n", b);
return 0;
}
double *func1() {
int i, j;
double *c; // c is a pointer to a double
// allocate space to store a double
c=(double *)malloc(sizeof(double)); /*@ \label{threadExample2B} @*/
*c = 10.0;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
a = 3.1456*j+i;
return c;
}
void *func2() {
int i, j;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
b = 3.1456*j+i;
return NULL;
}
\end{lstlisting}
\end{program}
Note that in this new version of {\tt func1()} we declare {\tt c} to
be a pointer to a double and then, in line \ref{threadExample2B},
allocate space where the double can be stored and then, finally, store
$10.0$ at this location. This is rather complicated and it might seem
that a simpler approach would be merely to declare {\tt c} to be a
double and then return the address of {\tt c}, i.e., end the function
with {\tt return \&c}. Unfortunately this would not work. The
problem with that approach is that declaring {\tt c} to be a double
would make it a local variable (one only known to {\tt func1()}) whose
memory would disappear when the function returned.
The second argument of {\tt pthread\_join()} in line
\ref{threadExample2A} provides the pointer to the return value of the
function that was executed by the thread. Since {\tt c} by itself is
a pointer to a double, {\tt \&c} is a pointer to a pointer to a
double, i.e., of type {\tt (double **)}. However, {\tt pthread\_join()}
assumes the second argument is a void pointer to a pointer and hence a
typecast is used to keep the compiler from complaining.
In the next example, shown in Program \ref{pro:threadsExamp3}, {\tt
func1()} and {\tt func2()} are modified so that they each take an
argument. These arguments are the double variables {\tt e} and {\tt
d} that are set in {\tt main()}.
\begin{program}
{\tt threads-example3.c}: Functions {\tt func1()} and {\tt func2()}
have been modified so that they now take arguments.
\label{pro:threadsExamp3}
\codemiddle
\begin{lstlisting}
/* threaded implementation -- passing arguments and
returning a value */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
double *func1(double *);
void *func2(double *);
double a, b;
int main() {
double *c; // used for return value from func1
double d=3.0, e=2.0; // arguments passed to functions
pthread_t thread1, thread2; // ID's for threads
pthread_create(&thread1, NULL, (void *)func1, (void *)&d);
pthread_create(&thread2, NULL, (void *)func2, (void *)&e);
pthread_join(thread1,(void **)&c);
printf("a,c: %f %f\n", a, *c);
pthread_join(thread2,NULL);
printf("b: %f\n", b);
return 0;
}
double *func1(double *arg) {
int i, j;
double *c;
c=(double *)malloc(sizeof(double));
*c = 10.0;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
a = (*arg)*j+i;
return c;
}
void *func2(double *arg) {
int i, j;
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
b = (*arg)*j+i;
return NULL;
}
\end{lstlisting}
\end{program}
In all these examples {\tt func1()} and {\tt func2()} have performed
essentially the same computation. The only reason there were two
separate functions is that {\tt func1()} set the global variable {\tt
a} while {\tt func2()} set the global variable {\tt b}. However,
knowing that we can both pass arguments and obtain return values, it
is possible to have a single function in our program. It can be
called multiple times and simultaneously. Provided the function does
not use global variables, the different calls will not interfere with
each other.
A program that uses a single function to accomplish what the previous
programs used two function for is shown in Program
\ref{pro:threadsExamp4}.
\begin{program}
{\tt threads-example4.c}: The global variables have been removed and a
single function {\tt func()} is called twice. The function {\tt
func()} and {\tt main()} communicate by passing arguments and checking
returns values (instead of via global variables).
\label{pro:threadsExamp4}
\codemiddle
\begin{lstlisting}
/* threaded implementation -- passing an argument and checking
return the value from a single function */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
double *func(double *);
int main() {
double *a, *b; // used for return values
double d=3.0, e=2.0;
pthread_t thread1, thread2; // ID's for threads
pthread_create(&thread1, NULL, (void *)func, (void *)&d);
pthread_create(&thread2, NULL, (void *)func, (void *)&e);
pthread_join(thread1,(void **)&a);
printf("a: %f\n", *a);
pthread_join(thread2,(void **)&b);
printf("b: %f\n", *b);
return 0;
}
double *func(double *arg) {
int i, j;
double *a;
a=(double *)malloc(sizeof(double));
for (j=0;j<4000;j++)
for (i=0;i<1000000;i++)
*a = (*arg)*j+i;
return a;
}
\end{lstlisting}
\end{program}
Threads provide a simple way to obtain parallelization. However, one
may find that in practice they do not provide the benefits one might
expect when applied to FDTD programs. FDTD is both computational
expensive and memory-bandwidth intensive. A great deal of data must
be passed between memory and the CPU. Often the bottleneck is not the
CPU but rather the speed of the ``bus'' that carries data between
memory and the CPU. Multi-processor machines do not have multiple
memory busses. Thus, splitting an FDTD computation between multiple
CPU's on the same computer will have those CPU's all requesting memory
from a bus that is already acting at full capacity. These CPU's will
have to wait on the arrival of the requested memory. Therefore, in
practice when using threaded code with $N$ threads on a computer with
$N$ processors, one is unlikely to see a computation-time reduction
that is anywhere close to the hypothetical maximum reduction of $1/N$.
\section{Message Passing Interface}
The message passing interface (MPI) is a standardized protocol, or set
of protocols, which have been implemented on a wide range of
platforms. MPI facilitates the communication between processes
whether they are running on a single host or multiple hosts. As with
pthreads, MPI provides a large number of functions. These functions
allow the user to control many aspects of the communication or they
greatly simplify what would otherwise be quite cumbersome tasks (such
as the efficient distribution of data to a large number of hosts).
Despite the large number of MPI functions, just six are needed to
begin exploiting the benefits of parallelization.
Before considering those six functions, it needs to be said that one
must have the supporting MPI framework installed on each of the hosts
to be used. Different implementations of the MPI protocol (or the MPI
2 protocol) are available from the Web. For example, LAM MPI is
available from \url{www.lam-mpi.org} but it is no longer being
actively developed. Instead, several MPI-developers have joined
together to work on OpenMPI which is available from
\url{www.open-mpi.org}. Alternatively, MPICH2 is available from
\url{www.mcs.anl.gov/research/projects/mpich2}. Installation of these
packages is relatively trivial (at least that is the case when using
Linux or Mac OS X), but some of the details associated with getting
jobs to run can be somewhat complicated (for example, ensuring that
access is available to remote machines without requiring explicit
typing of a password).
There is a great deal of MPI documentation available from the Web
(there are also a few books written on the subject). A good resource
for getting started is \url{computing.llnl.gov/tutorials/mpi/}.
In fact, Lawrence Livermore has many useful pages related to threads,
MPI, and other aspects of high-performance and parallel processing.
You can find the material by going to \url{computing.llnl.gov} and
following the link to ``Training.''
Returning to the six functions needed to use MPI in a meaningful way,
two of them concern initializing and closing the MPI set-up, two deal
with sending and receiving information, and two deal with determining
the number of processes and which particular process number is
associated with the given invocation. Programs which use MPI must
include the header file {\tt mpi.h}. Before any other MPI functions
are called, the function {\tt MPI\_Init()} must be called. The last
MPI function called should be {\tt MPI\_Finalize()}. Thus, a valid,
but useless, MPI program is shown in Program \ref{pro:uselessmpi}.
\begin{program}
{\tt useless-mpi.c}: A trivial, but valid, MPI
program. \label{pro:uselessmpi}
\codemiddle
\begin{lstlisting}
#include <mpi.h>
#include <stdio.h>
int main(int argc, char *argv[]) {
MPI_Init(&argc,&argv);
printf("I don't do anything useful yet.\n");
MPI_Finalize();
return 0;
}
\end{lstlisting}
\end{program}
When an MPI program is run, each process runs the same program. In
this case, there is nothing to distinguish between the processes.
They will all generate the same line of output. If there were $100$
processes, you would see $100$ lines of ``{\tt I don't do anything
useful yet.}''
\section{Open MPI Basics}
At this point we need to ask the question: What does it mean to run an
MPI program? Ultimately many copies of the same program are run.
Each copy may reside on a different computer and, in fact, multiple
copies may run on the same computer. The details concerning how one
gets these copies to run are somewhat dependent on the MPI package one
uses. Here we will briefly describe the steps associated with the
Open MPI package.
When Open MPI is installed, several executable files will be placed on
your system(s), e.g., {\tt mpicc}, {\tt mpiexe}, {\tt mpirun}, etc.
On most systems by default these files will be installed in the
directory {\tt /usr/local/bin} (but one can specify that the files
should be installed elsewhere if so desired). One must ensure that
the directory where these executables reside is in the search path.
When using MPI it is usually necessary to compile the source code in a
special way. Instead of using the {\tt gcc} compiler on UNIX/Linux
machines, one would use {\tt mpicc} ({\tt mpicc} is merely a wrapper
that ultimately calls the underlying compiler that one would have used
normally). So, for example, to compile the MPI program given above,
one would issue a command such as
\begin{verbatim}
mpicc -Wall -O useless-mpi.c -o useless-mpi
\end{verbatim}
One can now run the executable file {\tt useless-mpi}. The command to
do this is either {\tt mpirun} or {\tt mpiexec} (these commands are
synonymous). There are numerous
arguments that can be specified with the most important being the
number of processes. The following command says to run four copies of
{\tt useless-mpi}:
\begin{verbatim}
mpiexec -np 4 useless-mpi
\end{verbatim}
But where, precisely, is this run? In this case four copies of the
program are run on the local host. That is not precisely what we
want---we are interested in distributing the job to different
machines. There are multiple ways in which one can exercise control
of the running of MPI programs and we will explore just a few.
First, let us assume a ``multicomputer'' consists of five nodes with
names {\tt node01}, {\tt node02}, {\tt node03}, {\tt node04}, and {\tt
node05}. To make things more interesting, let us further assume
that {\tt node01} is one particular brand of computer and the other
nodes are a different brand (e.g., perhaps {\tt node01} is an
Intel-based machine while the other nodes are PowerPC-based machines).
Additionally assume that {\tt node01} has four processors while each
of the other nodes has two processors.
We can specify some of this information in a ``hostfile.'' For now, let us
exclude {\tt node01} since it is a different architecture. A hostfile
that describes a multicomputer consisting of the remaining four nodes might be
\begin{verbatim}
node05 slots=2
node04 slots=2
node03 slots=2
node02 slots=2
\end{verbatim}
Let us assume this information is stored in the file {\tt my\_hostfile}.
The {\tt slots} are the number of processors on a particular machine.
If one does not specify the number of slots, it is assumed to be one.
Let us further assume the executable {\tt useless-mpi} exists in a
director called \verb+~/Ompi+ (where the tilde is recognized as a
shorthand for the user's home directory on a UNIX/Linux machine). If
all the computers mount the same file structure, this may actually be
the exact same directory that all the machines are sharing. In that
case there would only be one copy of {\tt useless-mpi}.
Alternatively, each of the computers may have their own local copy of
a directory named \verb+~/Ompi+. In that case there would have to be
a local copy of the executable file {\tt useless-mpi} present on each
of the individual computers.
One could now run eight copies of the program by issuing the following
command:
\begin{verbatim}
mpirun -np 8 -hostfile my_hostfile ~/Ompi/useless-mpi
\end{verbatim}
This command could be issued from any of the nodes. Note that the
number of processes does not have to match the number of slots. The
following command will launch $12$ copies of the program
\begin{verbatim}
mpirun -np 12 -hostfile my_hostfile ~/Ompi/useless-mpi
\end{verbatim}
However, it will generally be best if one can match the job to the
physical configuration of the multicomputer, i.e., one job per
``slot.''
In order to incorporate {\tt node01} into the multicomputer, things
become slightly more complicated because executables compiled for {\tt
node01} will not run on the other nodes and vice versa. Thus one
must compile separate versions of the program on the different
machines. Let's assume that was done and on each of the nodes a copy
of {\tt useless-mpi} was place in the local directory {\tt /tmp}
(i.e., there is a copy of this directory and this executable on each
of the nodes). The hostfile {\tt my\_hostfile} could then be changed
to
\begin{verbatim}
node05 slots=2
node04 slots=2
node03 slots=2
node02 slots=2
node01 slots=4
\end{verbatim}
Note that there are four slots specified for {\tt node01} instead of
two. The command to run $12$ copies of the program would now be
\begin{verbatim}
mpirun -np 12 -hostfile my_hostfile /tmp/useless-mpi
\end{verbatim}
By introducing more arguments to the command line, one can exercise more
fine-grained control of the execution of the program. Let us again
assume that there is one common directory \verb+~/Ompi+ that all the
machines share. Let us further assume two version of {\tt
useless-mpi} have been compiled: one for PowerPC-based machines
called {\tt useless-mpi-ppc} and one for Intel-based machines called
{\tt useless-mpi-intel}. We can use a command that does away with the
hostfile and instead provide all the details explicitly:
\begin{verbatim}
mpirun -host node05,node04,node03,node02 \
-np 8 ~/Ompi/useless-mpi-ppc \
-host node01 -np 4 ~/Ompi/useless-mpi-intel
\end{verbatim}
Note that the backslashes here are quoting the end of the line. This
command can be given on a single line or can be given on multiple
lines, as shown here, if one ``quotes'' the carriage return.
\section{Rank and Size}
To do more meaningful tasks, it is typically necessary for each
processor to know how many total processes there are and which process
number is assigned to a particular invocation. In this way, each
processor can do something different based on its process number. In
MPI the process number is known as the rank. The number of processes
can be determined with the function {\tt MPI\_Comm\_size()} and the
rank can be determined with {\tt MPI\_Comm\_rank()}. The code shown
in Program \ref{pro:findrank} is a slight modification of the previous
program that now incorporates these functions.
\begin{program}
{\tt find-rank.c}: An MPI program where each process can determine the
total number of processes and its individual rank (i.e., process
number). \label{pro:findrank}
\codemiddle
\begin{lstlisting}
#include <mpi.h>
#include <stdio.h>
int main(int argc, char *argv[]) {
int rank, size;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
printf("I have rank %d out of %d total.\n",rank,size);
MPI_Finalize();
return 0;
}
\end{lstlisting}
\end{program}
Assume this is run with four total processes. The output will be
similar to this:
\begin{code}
I have rank 1 out of 4 total.
I have rank 0 out of 4 total.
I have rank 2 out of 4 total.
I have rank 3 out of 4 total.
\end{code}
Note that the {\tt size} is $4$, but {\tt rank} ranges between $0$ and
$3$ (i.e., ${\tt size}-1$). Also note that there is no guarantee that
the processes will report in rank order.
The argument {\tt MPI\_COMM\_WORLD} is known as an MPI communicator.
A communicator essentially specifies the processes which are grouped
together. One can create different communicators, i.e., group
different sets of processes together, and this can simplify handling
certain tasks for a particular problem. However, we will simply use
{\tt MPI\_COMM\_WORLD} which specifies all the processes.
\section{Communicating Between Processes}
To communicate between processes we can use the commands {\tt
MPI\_Send()} and {\tt MPI\_Recv()}. {\tt MPI\_Send()} has arguments of
the form:
\begin{code}
MPI_Send(&buffer, // address where data stored
count, // number of items to send
type, // type of data to send
dest, // rank of destination process
tag, // programmer-specified ID
comm); // MPI communicator
\end{code}
where {\tt buffer} is an address where the data to be sent is stored
(for example, the address of the start of an array), {\tt count} is
the number of elements or items to be sent, {\tt type} is the type of
data to be sent, {\tt dest} is the rank of the process to which this
information is being sent, {\tt tag} is a programmer-specified number
to identify this data, and {\tt comm} is an MPI communicator (which we
will leave as {\tt MPI\_COMM\_WORLD}). The {\tt type} is similar to
the standard C data types, but it is specified using MPI designations.
Some of those are: {\tt MPI\_INT}, {\tt MPI\_FLOAT}, and {\tt
MPI\_DOUBLE}, corresponding to the C data types of int, float, and
double (other types, some of which are specific to MPI, such as {\tt
MPI\_BYTE} and {\tt MPI\_PACKED}, exist too).
{\tt MPI\_Recv()} has arguments of the form:
\begin{verbatim}
MPI_Recv(&buffer,count,type,source,tag,comm,&status);
\end{verbatim}
In this case {\tt buffer} is the address where the received data is to
be stored. The meaning of {\tt count}, {\tt type}, {\tt tag}, and
{\tt comm} are unchanged from before. {\tt source} is the rank of the
process sending the data. The {\tt status} is a pointer to a
structure, specifically an {\tt MPI\_status} structure which is
specified in {\tt mpi.h}. This structure contains the rank of the
source and the {\tt tag} number.
Program \ref{pro:sendrecv} demonstrates the use of {\tt MPI\_Send()} and
{\tt MPI\_Recv()}. Here the process with rank $0$ serves as the master
process. It collects input from the user which will subsequently be
sent to the other processes. Specifically, the parent process prompts
the user for as many values (doubles) as there are number of processes
minus one. The master process then sends one number to each of the
other processes. These processes do a calculation based on the
number they receive and then send the result back to the master. The
master prints this received data and then the program terminates.
\begin{program}
{\tt sendrecv.c}: An MPI program that sends information back and forth
between a master process and slave processes. \label{pro:sendrecv}
\codemiddle
\begin{lstlisting}
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
int main(int argc, char *argv[]) {
int i, rank, size, tag_out=10, tag_in=11;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank==0) {
/* "master" process collects and distributes input */
double *a, *b;
/* allocate space for input and result */
a=malloc((size-1)*sizeof(double));
b=malloc((size-1)*sizeof(double));
/* prompt user for input */
printf("Enter %d numbers: ",size-1);
for (i=0; i<size-1; i++)
scanf("%lf",a+i);
/* send values to other processes */
for (i=0; i<size-1; i++)
MPI_Send(a+i,1,MPI_DOUBLE,i+1,tag_out,MPI_COMM_WORLD);
/* receive results calculated by other process */
for (i=0; i<size-1; i++)
MPI_Recv(b+i,1,MPI_DOUBLE,i+1,tag_in,MPI_COMM_WORLD,&status);
for (i=0; i<size-1; i++)
printf("%f\n",b[i]);
} else {
/* "slave" process */
int j;
double c, d;
/* receive input from the master process */
MPI_Recv(&c,1,MPI_DOUBLE,0,tag_out,MPI_COMM_WORLD,&status);
/* do some silly number crunching */
for (j=0;j<4000;j++)
for (i=0;i<100000;i++)
d = c*j+i;
/* send the result back to master */
MPI_Send(&d,1,MPI_DOUBLE,0,tag_in,MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
\end{lstlisting}
\end{program}
The six commands covered so far are sufficient to parallelize any
number of problems. However, there is some computational overhead
associated with parallelizing the code. Additionally, there is often
a significant cost associated with communication between processes,
especially if those processes are running on different hosts and the
network linking those hosts is slow.
The functions {\tt MPI\_Send()} and {\tt MPI\_Recv()} are blocking
commands. They do not return until they have accomplished the
requested send or receive. In some cases, especially if there is a
large amount of data to transmit, this can be costly. There are also
nonblocking or ``immediate'' versions of these functions. For these
functions control is returned to the calling function without a
guarantee of the send or receive having been accomplished. In this
way the program can continue some other useful task while the
communication is taking place. When one must ensure that the
communication is finished, the function {\tt MPI\_Wait()} provides a
blocking mechanism that suspends execution until the specified
communication is completed. The immediate send and receive functions
are of the form:
\begin{verbatim}
MPI_Isend(&buffer,count,type,dest,tag,comm,&request);
MPI_Irecv(&buffer,count,type,source,tag,comm,&request);
\end{verbatim}
The arguments to these functions are the same as the blocking version
except the final argument is now a pointer to an {\tt MPI\_Request}
structure instead of an {\tt MPI\_Status}. The wait command has the
following form:
\begin{verbatim}
MPI_Wait(&request,&status);
\end{verbatim}
Note that the communication for which the waiting is being done is
specified by the ``{\tt request},'' not the ``{\tt status}.'' So, if
there are multiple transmissions which are being done asynchronously,
one may have to create an array of {\tt MPI\_Request}'s. If one is
not concerned with the status of the transmissions, one does not have
to define a separate status for each transmission.
The code shown in Program \ref{pro:nonblocking} illustrates the use of
non-blocking send and receive. In this case the master process sends
the numbers to the other processes via {\tt MPI\_Isend()}. However,
the master does not bother to ensure that the send was performed.
Instead, the master will ultimately wait for the other process to
communicate the result back. The fact that the other processes are
sending information back serves as confirmation that the data was sent
from the master. After sending the data, the master process then
calls {\tt MPI\_Irecv()}. There is one call for each of the ``slave''
processes. After calling these functions, {\tt MPI\_Wait()} is used
to ensure the data has been received before printing the results. The
code associated with the slave processes is unchanged from before.
\begin{program}
{\tt nonblocking.c}: An MPI program that uses non-blocking sends and
receives. \label{pro:nonblocking}
\codemiddle
\begin{lstlisting}
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
int main(int argc, char *argv[]) {
int i, rank, size, tag_out=10, tag_in=11;
MPI_Status status;
MPI_Request *request_snd, *request_rcv;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank==0) {
/* "master" process collects and distributes input */
double *a, *b;
/* allocate space for input and result */
a=malloc((size-1)*sizeof(double));
b=malloc((size-1)*sizeof(double));
/* allocate space for the send and receive requests */
request_snd=malloc((size-1)*sizeof(MPI_Request));
request_rcv=malloc((size-1)*sizeof(MPI_Request));
/* prompt user for input */
printf("Enter %d numbers: ",size-1);
for (i=0; i<size-1; i++)
scanf("%lf",a+i);
/* non-blocking send of values to other processes */
for (i=0; i<size-1; i++)
MPI_Isend(a+i,1,MPI_DOUBLE,i+1,tag_out,MPI_COMM_WORLD,request_snd+i);
/* non-blocking reception of results calculated by other process */
for (i=0; i<size-1; i++)
MPI_Irecv(b+i,1,MPI_DOUBLE,i+1,tag_in,MPI_COMM_WORLD,request_rcv+i);
/* wait until we have received all the results */
for (i=0; i<size-1; i++)
MPI_Wait(request_rcv+i,&status);
for (i=0; i<size-1; i++)
printf("%f\n",b[i]);
} else {
/* "slave" process */
int j;
double c, d;
/* receive input from the master process */
MPI_Recv(&c,1,MPI_DOUBLE,0,tag_out,MPI_COMM_WORLD,&status);
/* do some silly number crunching */
for (j=0;j<4000;j++)
for (i=0;i<100000;i++)
d = c*j+i;
/* send the result back to master */
MPI_Send(&d,1,MPI_DOUBLE,0,tag_in,MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
\end{lstlisting}
\end{program}