Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

""" 

This module contains functions and wrappers for running code in parallel. 

""" 

from __future__ import print_function 

from builtins import range 

from multiprocessing import Pool, cpu_count 

import math 

 

 

def _wrapped_f(argstup): 

""" 

Wrapper for the function to run in parallel. 

 

:param argstup: arguments 

:type argstup: tuple 

:return: function return 

""" 

# if has_mpi: 

# print("Rank %s/%s, node: %s" %(comm.rank, comm.size, MPI.Get_processor_name())) 

func = argstup[0] 

args = argstup[1] 

kwargs = argstup[2] 

return func(*args, **kwargs) 

 

 

def parfor(func, args = None, kwargs = None, num_processes=None, chunksize=16, maxtasksperchild=None): 

""" 

Function for running a function multiple times with different inputs. 

 

The function is assumed to take the dynamic arguments before the static ones. 

 

If MPI is detected on the system, a :class:`mpi4py.futures.MPIPoolExecutor` computing pool will be launched. 

Otherwise, a :class:`multiprocessing.Pool` computing pool will be used. 

 

:param func: function to run 

:type func: function 

:param dynamic_args: list of argument tuples to be run 

:type dynamic_args: list of tuple 

:param static_args: tuple of arguments common to all runs 

:type static_args: tuple 

:param dynamic_kwargs: list of keyword arguments to be run 

:type dynamic_kwargs: list of dict 

:param static_kwargs: keyword arguments common to all runs 

:type static_kwargs: dict 

:param num_processes: number of processors/threads to launch 

:type num_processes: int 

:param chunksize: total number of runs can be broken into chunks of size chunksize for efficiency. This is only applicable if :class:`multiprocessing.Pool` is used. 

:type chunksize: int 

:param maxtasksperchild: maximum number of tasks to be run per child process. This is a :class:`multiprocessing.Pool` argument 

:type maxtasksperchild: int 

:return: list of results 

:rtype: list 

""" 

if args and kwargs: 

num_jobs = len(args) 

if num_jobs != len(kwargs): 

print("Warning: Different numbers of args and kwargs for function. ") 

elif args: 

num_jobs = len(args) 

kwargs = [None for _ in args] 

elif kwargs: 

num_jobs = len(kwargs) 

args = [None for _ in kwargs] 

 

if num_processes is None: 

num_processes = cpu_count() 

print("Initializing multiprocessing.Pool with %s workers, %s tasks/child" %(num_processes, maxtasksperchild)) 

pool = Pool(processes=num_processes, maxtasksperchild=maxtasksperchild) 

 

num_chunks = int(math.ceil(float(num_jobs)/float(chunksize))) 

jobs = list() 

ct = 0 

for i in range(num_chunks): 

num_part = min(chunksize, num_jobs-ct) 

print("Copying arguments for parallel pool, chunk %s of %s, size %s" %(i+1, num_chunks, num_part)) 

argslist = [(func, args[i], kwargs[i]) for i in range(ct,ct+num_part)] 

print("Running chunk") 

jobs += pool.map(_wrapped_f, argslist) 

ct += num_part 

pool.close() 

pool.join() 

return jobs