Source code for pymor.vectorarrays.mpi

# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

"""Wrapper classes for building MPI distributed |VectorArrays|.

This module contains several wrapper classes which allow to
transform single rank |VectorArrays| into MPI distributed
|VectorArrays| which can be used on rank 0 like ordinary
|VectorArrays|.

The implementations are based on the event loop provided
by :mod:`pymor.tools.mpi`.
"""

import numpy as np

from pymor.tools import mpi
from pymor.vectorarrays.interface import VectorArray, VectorSpace


[docs]class MPIVectorArray(VectorArray): """MPI distributed |VectorArray|. Given a local |VectorArray| on each MPI rank, this wrapper class uses the event loop from :mod:`pymor.tools.mpi` to build a global MPI distributed vector array from these local arrays. Instances of `MPIVectorArray` can be used on rank 0 like any other (non-distributed) |VectorArray|. Note, however, that the implementation of the local VectorArrays needs to be MPI aware. For instance, `cls.inner` must perform the needed MPI communication to sum up the local inner products and return the sums on rank 0. Default implementations for all communication requiring interface methods are provided by :class:`MPIVectorArrayAutoComm` (also see :class:`MPIVectorArrayNoComm`). Note that resource cleanup is handled by :meth:`object.__del__`. Please be aware of the peculiarities of destructors in Python! The associated |VectorSpace| is :class:`MPIVectorSpace`. """ def __init__(self, obj_id, space): self.obj_id = obj_id self.space = space
[docs] def __len__(self): return mpi.call(mpi.method_call, self.obj_id, '__len__')
[docs] def __getitem__(self, ind): U = type(self)(mpi.call(mpi.method_call_manage, self.obj_id, '__getitem__', ind), self.space) U.is_view = True return U
[docs] def __delitem__(self, ind): mpi.call(mpi.method_call, self.obj_id, '__delitem__', ind)
[docs] def copy(self, deep=False): return type(self)(mpi.call(mpi.method_call_manage, self.obj_id, 'copy', deep=deep), self.space)
[docs] def append(self, other, remove_from_other=False): mpi.call(mpi.method_call, self.obj_id, 'append', other.obj_id, remove_from_other=remove_from_other)
[docs] def scal(self, alpha): mpi.call(mpi.method_call, self.obj_id, 'scal', alpha)
[docs] def axpy(self, alpha, x): mpi.call(_MPIVectorArray_axpy, self.obj_id, alpha, x.obj_id)
[docs] def inner(self, other, product=None): if product is not None: return product.apply2(self, other) return mpi.call(mpi.method_call, self.obj_id, 'inner', other.obj_id)
[docs] def pairwise_inner(self, other, product=None): if product is not None: return product.pairwise_apply2(self, other) return mpi.call(mpi.method_call, self.obj_id, 'pairwise_inner', other.obj_id)
[docs] def lincomb(self, coefficients): return type(self)(mpi.call(mpi.method_call_manage, self.obj_id, 'lincomb', coefficients), self.space)
[docs] def _norm(self): return mpi.call(mpi.method_call, self.obj_id, '_norm')
[docs] def _norm2(self): return mpi.call(mpi.method_call, self.obj_id, '_norm2')
[docs] def dofs(self, dof_indices): return mpi.call(mpi.method_call, self.obj_id, 'dofs', dof_indices)
[docs] def amax(self): return mpi.call(mpi.method_call, self.obj_id, 'amax')
def __del__(self): mpi.call(mpi.remove_object, self.obj_id)
[docs]class MPIVectorSpace(VectorSpace): """|VectorSpace| of :class:`MPIVectorArrays <MPIVectorArray>`. Parameters ---------- local_spaces `tuple` of the different |VectorSpaces| of the local |VectorArrays| on the MPI ranks. Alternatively, the length of `local_spaces` may be 1, in which case the same |VectorSpace| is assumed for all ranks. """ array_type = MPIVectorArray def __init__(self, local_spaces): self.local_spaces = tuple(local_spaces) if type(local_spaces[0]) is RegisteredLocalSpace: self.id = _local_space_registry[local_spaces[0]].id else: self.id = local_spaces[0].id
[docs] def make_array(self, obj_id): """Create array from rank-local |VectorArray| instances. Parameters ---------- obj_id :class:`~pymor.tools.mpi.ObjectId` of the MPI distributed instances of `cls` wrapped by this array. Returns ------- The newly created :class:`MPIVectorArray`. """ assert mpi.call(_MPIVectorSpace_check_local_spaces, self.local_spaces, obj_id) return self.array_type(obj_id, self)
[docs] def zeros(self, count=1, reserve=0): return self.array_type( mpi.call(_MPIVectorSpace_zeros, self.local_spaces, count=count, reserve=reserve), self )
@property def dim(self): return mpi.call(_MPIVectorSpace_dim, self.local_spaces)
[docs] def __eq__(self, other): return type(other) is MPIVectorSpace and \ len(self.local_spaces) == len(other.local_spaces) and \ all(ls == ols for ls, ols in zip(self.local_spaces, other.local_spaces))
[docs] def __repr__(self): return f'{self.__class__}({self.local_spaces}, {self.id})'
[docs]class RegisteredLocalSpace(int):
[docs] def __repr__(self): return f'{_local_space_registry[self]} (id: {int(self)})'
_local_space_registry = {} _local_space_to_id = {} def _register_local_space(local_space): local_space_id = _local_space_to_id.get(local_space) if local_space_id is None: local_space_id = RegisteredLocalSpace(len(_local_space_registry)) _local_space_registry[local_space_id] = local_space _local_space_to_id[local_space] = local_space_id return local_space_id def _get_local_space(local_spaces): local_space = local_spaces[mpi.rank] if len(local_spaces) > 1 else local_spaces[0] if type(local_space) is RegisteredLocalSpace: local_space = _local_space_registry[local_space] return local_space def _MPIVectorSpace_zeros(local_spaces=(None,), count=0, reserve=0): local_space = _get_local_space(local_spaces) obj = local_space.zeros(count=count, reserve=reserve) return mpi.manage_object(obj) def _MPIVectorSpace_dim(local_spaces): local_space = _get_local_space(local_spaces) return local_space.dim def _MPIVectorSpace_check_local_spaces(local_spaces, obj_id): U = mpi.get_object(obj_id) local_space = _get_local_space(local_spaces) results = mpi.comm.gather(U in local_space, root=0) if mpi.rank0: return np.all(results) def _MPIVectorArray_axpy(obj_id, alpha, x_obj_id): obj = mpi.get_object(obj_id) x = mpi.get_object(x_obj_id) obj.axpy(alpha, x)
[docs]class MPIVectorArrayNoComm(MPIVectorArray): """MPI distributed |VectorArray|. This is a subclass of :class:`MPIVectorArray` which overrides all communication requiring interface methods to raise `NotImplementedError`. This is mainly useful as a security measure when wrapping arrays for which simply calling the respective method on the wrapped arrays would lead to wrong results and :class:`MPIVectorArrayAutoComm` cannot be used either (for instance in the presence of shared DOFs). The associated |VectorSpace| is :class:`MPIVectorSpaceNoComm`. """
[docs] def inner(self, other, product=None): if product is not None: return product.apply2(self, other) raise NotImplementedError
[docs] def pairwise_inner(self, other, product=None): if product is not None: return product.pairwise_apply2(self, other) raise NotImplementedError
[docs] def _norm(self): raise NotImplementedError
[docs] def _norm2(self): raise NotImplementedError
[docs] def dofs(self, dof_indices): raise NotImplementedError
[docs] def amax(self): raise NotImplementedError
[docs]class MPIVectorSpaceNoComm(MPIVectorSpace): """|VectorSpace| for :class:`MPIVectorArrayNoComm`.""" array_type = MPIVectorArrayNoComm @property def dim(self): raise NotImplementedError
[docs]class MPIVectorArrayAutoComm(MPIVectorArray): """MPI distributed |VectorArray|. This is a subclass of :class:`MPIVectorArray` which provides default implementations for all communication requiring interface methods for the case when the wrapped array is not MPI aware. Note, however, that depending on the model these default implementations might lead to wrong results (for instance in the presence of shared DOFs). The associated |VectorSpace| is :class:`MPIVectorSpaceAutoComm`. """
[docs] def inner(self, other, product=None): if product is not None: return product.apply2(self, other) return mpi.call(_MPIVectorArrayAutoComm_inner, self.obj_id, other.obj_id)
[docs] def pairwise_inner(self, other, product=None): if product is not None: return product.pairwise_apply2(self, other) return mpi.call(_MPIVectorArrayAutoComm_pairwise_inner, self.obj_id, other.obj_id)
[docs] def _norm(self): return mpi.call(_MPIVectorArrayAutoComm_norm, self.obj_id)
[docs] def _norm2(self): return mpi.call(_MPIVectorArrayAutoComm_norm2, self.obj_id)
[docs] def dofs(self, dof_indices): offsets = getattr(self, '_offsets', None) if offsets is None: offsets = self.space._get_dims()[1] dof_indices = np.array(dof_indices) return mpi.call(_MPIVectorArrayAutoComm_dofs, self.obj_id, offsets, dof_indices)
[docs] def amax(self): offsets = getattr(self, '_offsets', None) if offsets is None: offsets = self.space._get_dims()[1] inds, vals = mpi.call(_MPIVectorArrayAutoComm_amax, self.obj_id) inds += offsets[:, np.newaxis] max_inds = np.argmax(vals, axis=0) # np.choose does not work due to # https://github.com/numpy/numpy/issues/3259 return (np.array([inds[max_inds[i], i] for i in range(len(max_inds))]), np.array([vals[max_inds[i], i] for i in range(len(max_inds))]))
[docs]class MPIVectorSpaceAutoComm(MPIVectorSpace): """|VectorSpace| for :class:`MPIVectorArrayAutoComm`.""" array_type = MPIVectorArrayAutoComm @property def dim(self): dim = getattr(self, '_dim', None) if dim is None: dim = self._get_dims()[0] return dim def _get_dims(self): dims = mpi.call(_MPIVectorSpaceAutoComm_dim, self.local_spaces) self._offsets = offsets = np.cumsum(np.concatenate(([0], dims)))[:-1] self._dim = dim = sum(dims) return dim, offsets
def _MPIVectorSpaceAutoComm_dim(local_spaces): local_space = _get_local_space(local_spaces) dims = mpi.comm.gather(local_space, root=0) if mpi.rank0: return dims def _MPIVectorArrayAutoComm_inner(self, other): self = mpi.get_object(self) other = mpi.get_object(other) local_results = self.inner(other) assert local_results.dtype == np.float64 results = np.empty((mpi.size,) + local_results.shape, dtype=np.float64) if mpi.rank0 else None mpi.comm.Gather(local_results, results, root=0) if mpi.rank0: return np.sum(results, axis=0) def _MPIVectorArrayAutoComm_pairwise_inner(self, other): self = mpi.get_object(self) other = mpi.get_object(other) local_results = self.pairwise_inner(other) assert local_results.dtype == np.float64 results = np.empty((mpi.size,) + local_results.shape, dtype=np.float64) if mpi.rank0 else None mpi.comm.Gather(local_results, results, root=0) if mpi.rank0: return np.sum(results, axis=0) def _MPIVectorArrayAutoComm_norm(self): self = mpi.get_object(self) local_results = self._norm2() assert local_results.dtype == np.float64 results = np.empty((mpi.size,) + local_results.shape, dtype=np.float64) if mpi.rank0 else None mpi.comm.Gather(local_results, results, root=0) if mpi.rank0: return np.sqrt(np.sum(results, axis=0)) def _MPIVectorArrayAutoComm_norm2(self): self = mpi.get_object(self) local_results = self._norm2() assert local_results.dtype == np.float64 results = np.empty((mpi.size,) + local_results.shape, dtype=np.float64) if mpi.rank0 else None mpi.comm.Gather(local_results, results, root=0) if mpi.rank0: return np.sum(results, axis=0) def _MPIVectorArrayAutoComm_dofs(self, offsets, dof_indices): self = mpi.get_object(self) offset = offsets[mpi.rank] dim = self.dim my_indices = np.logical_and(dof_indices >= offset, dof_indices < offset + dim) local_results = np.zeros((len(self), len(dof_indices))) local_results[:, my_indices] = self.dofs(dof_indices[my_indices] - offset) assert local_results.dtype == np.float64 results = np.empty((mpi.size,) + local_results.shape, dtype=np.float64) if mpi.rank0 else None mpi.comm.Gather(local_results, results, root=0) if mpi.rank0: return np.sum(results, axis=0) def _MPIVectorArrayAutoComm_amax(self): self = mpi.get_object(self) local_inds, local_vals = self.amax() assert local_inds.dtype == np.int64 assert local_vals.dtype == np.float64 inds = np.empty((mpi.size,) + local_inds.shape, dtype=np.int64) if mpi.rank0 else None vals = np.empty((mpi.size,) + local_inds.shape, dtype=np.float64) if mpi.rank0 else None mpi.comm.Gather(local_inds, inds, root=0) mpi.comm.Gather(local_vals, vals, root=0) if mpi.rank0: return inds, vals