Source code for pymor.discretizers.builtin.fv

# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

""" This module provides some operators for finite volume discretizations."""
from functools import partial

import numpy as np
from scipy.sparse import coo_matrix, csc_matrix, dia_matrix

from pymor.algorithms.preassemble import preassemble as preassemble_
from pymor.algorithms.timestepping import ExplicitEulerTimeStepper, ImplicitEulerTimeStepper
from pymor.analyticalproblems.elliptic import StationaryProblem
from pymor.analyticalproblems.functions import Function, LincombFunction
from pymor.analyticalproblems.instationary import InstationaryProblem
from pymor.core.base import abstractmethod
from pymor.core.defaults import defaults
from pymor.discretizers.builtin.domaindiscretizers.default import discretize_domain_default
from pymor.discretizers.builtin.grids.interfaces import AffineGridWithOrthogonalCenters
from pymor.discretizers.builtin.grids.referenceelements import line, triangle, square
from pymor.discretizers.builtin.grids.subgrid import SubGrid, make_sub_grid_boundary_info
from pymor.discretizers.builtin.gui.visualizers import PatchVisualizer, OnedVisualizer
from pymor.discretizers.builtin.inplace import iadd_masked, isub_masked
from pymor.discretizers.builtin.quadratures import GaussQuadratures
from pymor.models.basic import StationaryModel, InstationaryModel
from pymor.operators.constructions import ComponentProjection, LincombOperator, ZeroOperator
from pymor.operators.interface import Operator
from pymor.operators.numpy import NumpyGenericOperator, NumpyMatrixBasedOperator, NumpyMatrixOperator
from pymor.parameters.base import ParametricObject
from pymor.vectorarrays.numpy import NumpyVectorSpace


[docs]def FVVectorSpace(grid, id='STATE'): return NumpyVectorSpace(grid.size(0), id)
[docs]class NumericalConvectiveFlux(ParametricObject): """Interface for numerical convective fluxes for finite volume schemes. Numerical fluxes defined by this interfaces are functions of the form `F(U_inner, U_outer, unit_outer_normal, edge_volume, mu)`. The flux evaluation is vectorized and happens in two stages: 1. `evaluate_stage1` receives a |NumPy array| `U` of all values which appear as `U_inner` or `U_outer` for all edges the flux shall be evaluated at and returns a `tuple` of |NumPy arrays| each of the same length as `U`. 2. `evaluate_stage2` receives the reordered `stage1_data` for each edge as well as the unit outer normal and the volume of the edges. `stage1_data` is given as follows: If `R_l` is `l`-th entry of the `tuple` returned by `evaluate_stage1`, the `l`-th entry `D_l` of of the `stage1_data` tuple has the shape `(num_edges, 2) + R_l.shape[1:]`. If for edge `k` the values `U_inner` and `U_outer` are the `i`-th and `j`-th value in the `U` array provided to `evaluate_stage1`, we have :: D_l[k, 0] == R_l[i], D_l[k, 1] == R_l[j]. `evaluate_stage2` returns a |NumPy array| of the flux evaluations for each edge. """ @abstractmethod def evaluate_stage1(self, U, mu=None): pass @abstractmethod def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None): pass
[docs]class LaxFriedrichsFlux(NumericalConvectiveFlux): """Lax-Friedrichs numerical flux. If `f` is the analytical flux, the Lax-Friedrichs flux `F` is given by:: F(U_in, U_out, normal, vol) = vol * [normal⋅(f(U_in) + f(U_out))/2 + (U_in - U_out)/(2*λ)] Parameters ---------- flux |Function| defining the analytical flux `f`. lxf_lambda The stabilization parameter `λ`. """ def __init__(self, flux, lxf_lambda=1.0): self.__auto_init(locals()) def evaluate_stage1(self, U, mu=None): return U, self.flux(U[..., np.newaxis], mu) def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None): U, F = stage1_data return (np.sum(np.sum(F, axis=1) * unit_outer_normals, axis=1) * 0.5 + (U[..., 0] - U[..., 1]) * (0.5 / self.lxf_lambda)) * volumes
[docs]class SimplifiedEngquistOsherFlux(NumericalConvectiveFlux): """Engquist-Osher numerical flux. Simplified Implementation for special case. For the definition of the Engquist-Osher flux see :class:`EngquistOsherFlux`. This class provides a faster and more accurate implementation for the special case that `f(0) == 0` and the derivative of `f` only changes sign at `0`. Parameters ---------- flux |Function| defining the analytical flux `f`. flux_derivative |Function| defining the analytical flux derivative `f'`. """ def __init__(self, flux, flux_derivative): self.__auto_init(locals()) def evaluate_stage1(self, U, mu=None): return self.flux(U[..., np.newaxis], mu), self.flux_derivative(U[..., np.newaxis], mu) def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None): F_edge, F_d_edge = stage1_data unit_outer_normals = unit_outer_normals[:, np.newaxis, :] F_d_edge = np.sum(F_d_edge * unit_outer_normals, axis=2) F_edge = np.sum(F_edge * unit_outer_normals, axis=2) F_edge[:, 0] = np.where(np.greater_equal(F_d_edge[:, 0], 0), F_edge[:, 0], 0) F_edge[:, 1] = np.where(np.less_equal(F_d_edge[:, 1], 0), F_edge[:, 1], 0) F_edge = np.sum(F_edge, axis=1) F_edge *= volumes return F_edge
[docs]class EngquistOsherFlux(NumericalConvectiveFlux): """Engquist-Osher numerical flux. If `f` is the analytical flux, and `f'` its derivative, the Engquist-Osher flux is given by:: F(U_in, U_out, normal, vol) = vol * [c^+(U_in, normal) + c^-(U_out, normal)] U_in c^+(U_in, normal) = f(0)⋅normal + ∫ max(f'(s)⋅normal, 0) ds s=0 U_out c^-(U_out, normal) = ∫ min(f'(s)⋅normal, 0) ds s=0 Parameters ---------- flux |Function| defining the analytical flux `f`. flux_derivative |Function| defining the analytical flux derivative `f'`. gausspoints Number of Gauss quadrature points to be used for integration. intervals Number of subintervals to be used for integration. """ def __init__(self, flux, flux_derivative, gausspoints=5, intervals=1): self.__auto_init(locals()) points, weights = GaussQuadratures.quadrature(npoints=self.gausspoints) points = points / intervals points = ((np.arange(self.intervals, dtype=np.float)[:, np.newaxis] * (1 / intervals)) + points[np.newaxis, :]).ravel() weights = np.tile(weights, intervals) * (1 / intervals) self.points = points self.weights = weights def evaluate_stage1(self, U, mu=None): int_els = np.abs(U)[:, np.newaxis, np.newaxis] return [np.concatenate([self.flux_derivative(U[:, np.newaxis] * p, mu)[:, np.newaxis, :] * int_els * w for p, w in zip(self.points, self.weights)], axis=1)] def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None): F0 = np.sum(self.flux.evaluate(np.array([[0.]]), mu=mu) * unit_outer_normals, axis=1) Fs = np.sum(stage1_data[0] * unit_outer_normals[:, np.newaxis, np.newaxis, :], axis=3) Fs[:, 0, :] = np.maximum(Fs[:, 0, :], 0) Fs[:, 1, :] = np.minimum(Fs[:, 1, :], 0) Fs = np.sum(np.sum(Fs, axis=2), axis=1) + F0 Fs *= volumes return Fs
[docs]@defaults('delta') def jacobian_options(delta=1e-7): return {'delta': delta}
[docs]class NonlinearAdvectionOperator(Operator): """Nonlinear finite volume advection |Operator|. The operator is of the form :: L(u, mu)(x) = ∇ ⋅ f(u(x), mu) Parameters ---------- grid |Grid| for which to evaluate the operator. boundary_info |BoundaryInfo| determining the Dirichlet and Neumann boundaries. numerical_flux The :class:`NumericalConvectiveFlux <NumericalConvectiveFlux>` to use. dirichlet_data |Function| providing the Dirichlet boundary values. If `None`, constant-zero boundary is assumed. solver_options The |solver_options| for the operator. name The name of the operator. """ linear = False def __init__(self, grid, boundary_info, numerical_flux, dirichlet_data=None, solver_options=None, space_id='STATE', name=None): assert dirichlet_data is None or isinstance(dirichlet_data, Function) self.__auto_init(locals()) if (isinstance(dirichlet_data, Function) and boundary_info.has_dirichlet and not dirichlet_data.parametric): self._dirichlet_values = self.dirichlet_data(grid.centers(1)[boundary_info.dirichlet_boundaries(1)]) self._dirichlet_values = self._dirichlet_values.ravel() self._dirichlet_values_flux_shaped = self._dirichlet_values.reshape((-1, 1)) self.source = self.range = FVVectorSpace(grid, space_id) def with_numerical_flux(self, **kwargs): return self.with_(numerical_flux=self.numerical_flux.with_(**kwargs))
[docs] def restricted(self, dofs): source_dofs = np.setdiff1d(np.union1d(self.grid.neighbours(0, 0)[dofs].ravel(), dofs), np.array([-1], dtype=np.int32), assume_unique=True) sub_grid = SubGrid(self.grid, source_dofs) sub_boundary_info = make_sub_grid_boundary_info(sub_grid, self.grid, self.boundary_info) op = self.with_(grid=sub_grid, boundary_info=sub_boundary_info, space_id=None, name=f'{self.name}_restricted') sub_grid_indices = sub_grid.indices_from_parent_indices(dofs, codim=0) proj = ComponentProjection(sub_grid_indices, op.range) return proj @ op, sub_grid.parent_indices(0)
def _fetch_grid_data(self): # pre-fetch all grid-associated data to avoid searching the cache for each operator application g = self.grid bi = self.boundary_info self._grid_data = dict(SUPE=g.superentities(1, 0), SUPI=g.superentity_indices(1, 0), VOLS0=g.volumes(0), VOLS1=g.volumes(1), BOUNDARIES=g.boundaries(1), CENTERS=g.centers(1), DIRICHLET_BOUNDARIES=bi.dirichlet_boundaries(1) if bi.has_dirichlet else None, NEUMANN_BOUNDARIES=bi.neumann_boundaries(1) if bi.has_neumann else None) self._grid_data.update(UNIT_OUTER_NORMALS=g.unit_outer_normals()[self._grid_data['SUPE'][:, 0], self._grid_data['SUPI'][:, 0]])
[docs] def apply(self, U, mu=None): assert U in self.source assert self.parameters.assert_compatible(mu) if not hasattr(self, '_grid_data'): self._fetch_grid_data() U = U.to_numpy() R = np.zeros((len(U), self.source.dim)) bi = self.boundary_info gd = self._grid_data SUPE = gd['SUPE'] VOLS0 = gd['VOLS0'] VOLS1 = gd['VOLS1'] BOUNDARIES = gd['BOUNDARIES'] CENTERS = gd['CENTERS'] DIRICHLET_BOUNDARIES = gd['DIRICHLET_BOUNDARIES'] NEUMANN_BOUNDARIES = gd['NEUMANN_BOUNDARIES'] UNIT_OUTER_NORMALS = gd['UNIT_OUTER_NORMALS'] if bi.has_dirichlet: if hasattr(self, '_dirichlet_values'): dirichlet_values = self._dirichlet_values elif self.dirichlet_data is not None: dirichlet_values = self.dirichlet_data(CENTERS[DIRICHLET_BOUNDARIES], mu=mu) else: dirichlet_values = np.zeros_like(DIRICHLET_BOUNDARIES) F_dirichlet = self.numerical_flux.evaluate_stage1(dirichlet_values, mu) for i, j in enumerate(range(len(U))): Ui = U[j] Ri = R[i] F = self.numerical_flux.evaluate_stage1(Ui, mu) F_edge = [f[SUPE] for f in F] for f in F_edge: f[BOUNDARIES, 1] = f[BOUNDARIES, 0] if bi.has_dirichlet: for f, f_d in zip(F_edge, F_dirichlet): f[DIRICHLET_BOUNDARIES, 1] = f_d NUM_FLUX = self.numerical_flux.evaluate_stage2(F_edge, UNIT_OUTER_NORMALS, VOLS1, mu) if bi.has_neumann: NUM_FLUX[NEUMANN_BOUNDARIES] = 0 iadd_masked(Ri, NUM_FLUX, SUPE[:, 0]) isub_masked(Ri, NUM_FLUX, SUPE[:, 1]) R /= VOLS0 return self.range.make_array(R)
[docs] def jacobian(self, U, mu=None): assert U in self.source and len(U) == 1 assert self.parameters.assert_compatible(mu) if not hasattr(self, '_grid_data'): self._fetch_grid_data() U = U.to_numpy().ravel() g = self.grid bi = self.boundary_info gd = self._grid_data SUPE = gd['SUPE'] VOLS0 = gd['VOLS0'] VOLS1 = gd['VOLS1'] BOUNDARIES = gd['BOUNDARIES'] CENTERS = gd['CENTERS'] DIRICHLET_BOUNDARIES = gd['DIRICHLET_BOUNDARIES'] NEUMANN_BOUNDARIES = gd['NEUMANN_BOUNDARIES'] UNIT_OUTER_NORMALS = gd['UNIT_OUTER_NORMALS'] INNER = np.setdiff1d(np.arange(g.size(1)), BOUNDARIES) solver_options = self.solver_options delta = solver_options.get('jacobian_delta') if solver_options else None if delta is None: delta = jacobian_options()['delta'] if bi.has_dirichlet: if hasattr(self, '_dirichlet_values'): dirichlet_values = self._dirichlet_values elif self.dirichlet_data is not None: dirichlet_values = self.dirichlet_data(CENTERS[DIRICHLET_BOUNDARIES], mu=mu) else: dirichlet_values = np.zeros_like(DIRICHLET_BOUNDARIES) F_dirichlet = self.numerical_flux.evaluate_stage1(dirichlet_values, mu) UP = U + delta UM = U - delta F = self.numerical_flux.evaluate_stage1(U, mu) FP = self.numerical_flux.evaluate_stage1(UP, mu) FM = self.numerical_flux.evaluate_stage1(UM, mu) del UP, UM F_edge = [f[SUPE] for f in F] FP_edge = [f[SUPE] for f in FP] FM_edge = [f[SUPE] for f in FM] del F, FP, FM F0P_edge = [f.copy() for f in F_edge] for f, ff in zip(F0P_edge, FP_edge): f[:, 0] = ff[:, 0] f[BOUNDARIES, 1] = f[BOUNDARIES, 0] if bi.has_dirichlet: for f, f_d in zip(F0P_edge, F_dirichlet): f[DIRICHLET_BOUNDARIES, 1] = f_d NUM_FLUX_0P = self.numerical_flux.evaluate_stage2(F0P_edge, UNIT_OUTER_NORMALS, VOLS1, mu) del F0P_edge F0M_edge = [f.copy() for f in F_edge] for f, ff in zip(F0M_edge, FM_edge): f[:, 0] = ff[:, 0] f[BOUNDARIES, 1] = f[BOUNDARIES, 0] if bi.has_dirichlet: for f, f_d in zip(F0M_edge, F_dirichlet): f[DIRICHLET_BOUNDARIES, 1] = f_d NUM_FLUX_0M = self.numerical_flux.evaluate_stage2(F0M_edge, UNIT_OUTER_NORMALS, VOLS1, mu) del F0M_edge D_NUM_FLUX_0 = (NUM_FLUX_0P - NUM_FLUX_0M) D_NUM_FLUX_0 /= (2 * delta) if bi.has_neumann: D_NUM_FLUX_0[NEUMANN_BOUNDARIES] = 0 del NUM_FLUX_0P, NUM_FLUX_0M F1P_edge = [f.copy() for f in F_edge] for f, ff in zip(F1P_edge, FP_edge): f[:, 1] = ff[:, 1] f[BOUNDARIES, 1] = f[BOUNDARIES, 0] if bi.has_dirichlet: for f, f_d in zip(F1P_edge, F_dirichlet): f[DIRICHLET_BOUNDARIES, 1] = f_d NUM_FLUX_1P = self.numerical_flux.evaluate_stage2(F1P_edge, UNIT_OUTER_NORMALS, VOLS1, mu) del F1P_edge, FP_edge F1M_edge = F_edge for f, ff in zip(F1M_edge, FM_edge): f[:, 1] = ff[:, 1] f[BOUNDARIES, 1] = f[BOUNDARIES, 0] if bi.has_dirichlet: for f, f_d in zip(F1M_edge, F_dirichlet): f[DIRICHLET_BOUNDARIES, 1] = f_d NUM_FLUX_1M = self.numerical_flux.evaluate_stage2(F1M_edge, UNIT_OUTER_NORMALS, VOLS1, mu) del F1M_edge, FM_edge D_NUM_FLUX_1 = (NUM_FLUX_1P - NUM_FLUX_1M) D_NUM_FLUX_1 /= (2 * delta) if bi.has_neumann: D_NUM_FLUX_1[NEUMANN_BOUNDARIES] = 0 del NUM_FLUX_1P, NUM_FLUX_1M I1 = np.hstack([SUPE[INNER, 0], SUPE[INNER, 0], SUPE[INNER, 1], SUPE[INNER, 1], SUPE[BOUNDARIES, 0]]) I0 = np.hstack([SUPE[INNER, 0], SUPE[INNER, 1], SUPE[INNER, 0], SUPE[INNER, 1], SUPE[BOUNDARIES, 0]]) V = np.hstack([D_NUM_FLUX_0[INNER], -D_NUM_FLUX_0[INNER], D_NUM_FLUX_1[INNER], -D_NUM_FLUX_1[INNER], D_NUM_FLUX_0[BOUNDARIES]]) A = coo_matrix((V, (I0, I1)), shape=(g.size(0), g.size(0))) A = csc_matrix(A).copy() # See pymor.discretizers.builtin.cg.DiffusionOperatorP1 for why copy() is necessary A = dia_matrix(([1. / VOLS0], [0]), shape=(g.size(0),) * 2) * A return NumpyMatrixOperator(A, source_id=self.source.id, range_id=self.range.id)
[docs]def nonlinear_advection_lax_friedrichs_operator(grid, boundary_info, flux, lxf_lambda=1.0, dirichlet_data=None, solver_options=None, name=None): """Instantiate a :class:`NonlinearAdvectionOperator` using :class:`LaxFriedrichsFlux`.""" num_flux = LaxFriedrichsFlux(flux, lxf_lambda) return NonlinearAdvectionOperator(grid, boundary_info, num_flux, dirichlet_data, solver_options, name=name)
[docs]def nonlinear_advection_simplified_engquist_osher_operator(grid, boundary_info, flux, flux_derivative, dirichlet_data=None, solver_options=None, name=None): """Instantiate a :class:`NonlinearAdvectionOperator` using :class:`SimplifiedEngquistOsherFlux`.""" num_flux = SimplifiedEngquistOsherFlux(flux, flux_derivative) return NonlinearAdvectionOperator(grid, boundary_info, num_flux, dirichlet_data, solver_options, name=name)
[docs]def nonlinear_advection_engquist_osher_operator(grid, boundary_info, flux, flux_derivative, gausspoints=5, intervals=1, dirichlet_data=None, solver_options=None, name=None): """Instantiate a :class:`NonlinearAdvectionOperator` using :class:`EngquistOsherFlux`.""" num_flux = EngquistOsherFlux(flux, flux_derivative, gausspoints=gausspoints, intervals=intervals) return NonlinearAdvectionOperator(grid, boundary_info, num_flux, dirichlet_data, solver_options, name=name)
[docs]class LinearAdvectionLaxFriedrichs(NumpyMatrixBasedOperator): """Linear advection finite Volume |Operator| using Lax-Friedrichs flux. The operator is of the form :: L(u, mu)(x) = ∇ ⋅ (v(x, mu)⋅u(x)) See :class:`LaxFriedrichsFlux` for the definition of the Lax-Friedrichs flux. Parameters ---------- grid |Grid| over which to assemble the operator. boundary_info |BoundaryInfo| determining the Dirichlet and Neumann boundaries. velocity_field |Function| defining the velocity field `v`. lxf_lambda The stabilization parameter `λ`. solver_options The |solver_options| for the operator. name The name of the operator. """ def __init__(self, grid, boundary_info, velocity_field, lxf_lambda=1.0, solver_options=None, name=None): self.__auto_init(locals()) self.source = self.range = FVVectorSpace(grid) def _assemble(self, mu=None): g = self.grid bi = self.boundary_info SUPE = g.superentities(1, 0) SUPI = g.superentity_indices(1, 0) assert SUPE.ndim == 2 edge_volumes = g.volumes(1) boundary_edges = g.boundaries(1) inner_edges = np.setdiff1d(np.arange(g.size(1)), boundary_edges) dirichlet_edges = bi.dirichlet_boundaries(1) if bi.has_dirichlet else np.array([], ndmin=1, dtype=np.int) neumann_edges = bi.neumann_boundaries(1) if bi.has_neumann else np.array([], ndmin=1, dtype=np.int) outflow_edges = np.setdiff1d(boundary_edges, np.hstack([dirichlet_edges, neumann_edges])) normal_velocities = np.einsum('ei,ei->e', self.velocity_field(g.centers(1), mu=mu), g.unit_outer_normals()[SUPE[:, 0], SUPI[:, 0]]) nv_inner = normal_velocities[inner_edges] l_inner = np.ones_like(nv_inner) * (1. / self.lxf_lambda) I0_inner = np.hstack([SUPE[inner_edges, 0], SUPE[inner_edges, 0], SUPE[inner_edges, 1], SUPE[inner_edges, 1]]) I1_inner = np.hstack([SUPE[inner_edges, 0], SUPE[inner_edges, 1], SUPE[inner_edges, 0], SUPE[inner_edges, 1]]) V_inner = np.hstack([nv_inner, nv_inner, -nv_inner, -nv_inner]) V_inner += np.hstack([l_inner, -l_inner, -l_inner, l_inner]) V_inner *= np.tile(0.5 * edge_volumes[inner_edges], 4) I_out = SUPE[outflow_edges, 0] V_out = edge_volumes[outflow_edges] * normal_velocities[outflow_edges] I_dir = SUPE[dirichlet_edges, 0] V_dir = edge_volumes[dirichlet_edges] * (0.5 * normal_velocities[dirichlet_edges] + 0.5 / self.lxf_lambda) I0 = np.hstack([I0_inner, I_out, I_dir]) I1 = np.hstack([I1_inner, I_out, I_dir]) V = np.hstack([V_inner, V_out, V_dir]) A = coo_matrix((V, (I0, I1)), shape=(g.size(0), g.size(0))) A = csc_matrix(A).copy() # See pymor.discretizers.builtin.cg.DiffusionOperatorP1 for why copy() is necessary A = dia_matrix(([1. / g.volumes(0)], [0]), shape=(g.size(0),) * 2) * A return A
[docs]class L2Product(NumpyMatrixBasedOperator): """|Operator| representing the L2-product between finite volume functions. Parameters ---------- grid The |Grid| for which to assemble the product. solver_options The |solver_options| for the operator. name The name of the product. """ sparse = True def __init__(self, grid, solver_options=None, name=None): self.__auto_init(locals()) self.source = self.range = FVVectorSpace(grid) def _assemble(self, mu=None): A = dia_matrix((self.grid.volumes(0), [0]), shape=(self.grid.size(0),) * 2) return A
[docs]class ReactionOperator(NumpyMatrixBasedOperator): """Finite Volume reaction |Operator|. The operator is of the form :: L(u, mu)(x) = c(x, mu)⋅u(x) Parameters ---------- grid The |Grid| for which to assemble the operator. reaction_coefficient The function 'c' solver_options The |solver_options| for the operator. name The name of the operator. """ sparse = True def __init__(self, grid, reaction_coefficient, solver_options=None, name=None): assert reaction_coefficient.dim_domain == grid.dim and reaction_coefficient.shape_range == () self.__auto_init(locals()) self.source = self.range = FVVectorSpace(grid) def _assemble(self, mu=None): A = dia_matrix((self.reaction_coefficient.evaluate(self.grid.centers(0), mu=mu), [0]), shape=(self.grid.size(0),) * 2) return A
[docs]class NonlinearReactionOperator(Operator): linear = False def __init__(self, grid, reaction_function, reaction_function_derivative=None, space_id='STATE', name=None): self.__auto_init(locals()) self.source = self.range = FVVectorSpace(grid, space_id)
[docs] def apply(self, U, ind=None, mu=None): assert U in self.source R = U.to_numpy() if ind is None else U.to_numpy()[ind] R = self.reaction_function.evaluate(R.reshape(R.shape + (1,)), mu=mu) return self.range.make_array(R)
[docs] def jacobian(self, U, mu=None): if self.reaction_function_derivative is None: raise NotImplementedError U = U.to_numpy() A = dia_matrix((self.reaction_function_derivative.evaluate(U.reshape(U.shape + (1,)), mu=mu), [0]), shape=(self.grid.size(0),) * 2) return NumpyMatrixOperator(A, source_id=self.source.id, range_id=self.range.id)
[docs]class L2ProductFunctional(NumpyMatrixBasedOperator): """Finite volume functional representing the inner product with an L2-|Function|. Additionally, boundary conditions can be enforced by providing `dirichlet_data` and `neumann_data` functions. Parameters ---------- grid |Grid| for which to assemble the functional. function The |Function| with which to take the inner product or `None`. boundary_info |BoundaryInfo| determining the Dirichlet and Neumann boundaries or `None`. If `None`, no boundary treatment is performed. dirichlet_data |Function| providing the Dirichlet boundary values. If `None`, constant-zero boundary is assumed. diffusion_function See :class:`DiffusionOperator`. Has to be specified in case `dirichlet_data` is given. diffusion_constant See :class:`DiffusionOperator`. Has to be specified in case `dirichlet_data` is given. neumann_data |Function| providing the Neumann boundary values. If `None`, constant-zero is assumed. order Order of the Gauss quadrature to use for numerical integration. name The name of the functional. """ source = NumpyVectorSpace(1) sparse = False def __init__(self, grid, function=None, boundary_info=None, dirichlet_data=None, diffusion_function=None, diffusion_constant=None, neumann_data=None, order=1, name=None): assert function is None or function.shape_range == () self.__auto_init(locals()) self.range = FVVectorSpace(grid) def _assemble(self, mu=None): g = self.grid bi = self.boundary_info if self.function is not None: # evaluate function at all quadrature points -> shape = (g.size(0), number of quadrature points, 1) F = self.function(g.quadrature_points(0, order=self.order), mu=mu) _, w = g.reference_element.quadrature(order=self.order) # integrate the products of the function with the shape functions on each element # -> shape = (g.size(0), number of shape functions) F_INTS = np.einsum('ei,e,i->e', F, g.integration_elements(0), w).ravel() else: F_INTS = np.zeros(g.size(0)) if bi is not None and (bi.has_dirichlet and self.dirichlet_data is not None or bi.has_neumann and self.neumann_data): centers = g.centers(1) superentities = g.superentities(1, 0) superentity_indices = g.superentity_indices(1, 0) SE_I0 = superentities[:, 0] VOLS = g.volumes(1) FLUXES = np.zeros(g.size(1)) if bi.has_dirichlet and self.dirichlet_data is not None: dirichlet_mask = bi.dirichlet_mask(1) SE_I0_D = SE_I0[dirichlet_mask] boundary_normals = g.unit_outer_normals()[SE_I0_D, superentity_indices[:, 0][dirichlet_mask]] BOUNDARY_DISTS = np.sum((centers[dirichlet_mask, :] - g.orthogonal_centers()[SE_I0_D, :]) * boundary_normals, axis=-1) DIRICHLET_FLUXES = VOLS[dirichlet_mask] * self.dirichlet_data(centers[dirichlet_mask], mu=mu) / BOUNDARY_DISTS if self.diffusion_function is not None: DIRICHLET_FLUXES *= self.diffusion_function(centers[dirichlet_mask], mu=mu) if self.diffusion_constant is not None: DIRICHLET_FLUXES *= self.diffusion_constant FLUXES[dirichlet_mask] = DIRICHLET_FLUXES if bi.has_neumann and self.neumann_data is not None: neumann_mask = bi.neumann_mask(1) FLUXES[neumann_mask] -= VOLS[neumann_mask] * self.neumann_data(centers[neumann_mask], mu=mu) F_INTS += np.bincount(SE_I0, weights=FLUXES, minlength=len(F_INTS)) F_INTS /= g.volumes(0) return F_INTS.reshape((-1, 1))
[docs]class DiffusionOperator(NumpyMatrixBasedOperator): """Finite Volume Diffusion |Operator|. The operator is of the form :: (Lu)(x) = c ∇ ⋅ [ d(x) ∇ u(x) ] Parameters ---------- grid The |Grid| over which to assemble the operator. boundary_info |BoundaryInfo| for the treatment of Dirichlet boundary conditions. diffusion_function The scalar-valued |Function| `d(x)`. If `None`, constant one is assumed. diffusion_constant The constant `c`. If `None`, `c` is set to one. solver_options The |solver_options| for the operator. name Name of the operator. """ sparse = True def __init__(self, grid, boundary_info, diffusion_function=None, diffusion_constant=None, solver_options=None, name=None): super().__init__() assert isinstance(grid, AffineGridWithOrthogonalCenters) assert (diffusion_function is None or (isinstance(diffusion_function, Function) and diffusion_function.dim_domain == grid.dim and diffusion_function.shape_range == ())) self.__auto_init(locals()) self.source = self.range = FVVectorSpace(grid) def _assemble(self, mu=None): grid = self.grid # compute the local coordinates of the codim-1 subentity centers in the reference element reference_element = grid.reference_element(0) subentity_embedding = reference_element.subentity_embedding(1) subentity_centers = (np.einsum('eij,j->ei', subentity_embedding[0], reference_element.sub_reference_element(1).center()) + subentity_embedding[1]) # compute shift for periodic boundaries embeddings = grid.embeddings(0) superentities = grid.superentities(1, 0) superentity_indices = grid.superentity_indices(1, 0) boundary_mask = grid.boundary_mask(1) inner_mask = ~boundary_mask SE_I0 = superentities[:, 0] SE_I1 = superentities[:, 1] SE_I0_I = SE_I0[inner_mask] SE_I1_I = SE_I1[inner_mask] SHIFTS = (np.einsum('eij,ej->ei', embeddings[0][SE_I0_I, :, :], subentity_centers[superentity_indices[:, 0][inner_mask]]) + embeddings[1][SE_I0_I, :]) SHIFTS -= (np.einsum('eij,ej->ei', embeddings[0][SE_I1_I, :, :], subentity_centers[superentity_indices[:, 1][inner_mask]]) + embeddings[1][SE_I1_I, :]) # comute distances for gradient approximations centers = grid.centers(1) orthogonal_centers = grid.orthogonal_centers() VOLS = grid.volumes(1) INNER_DISTS = np.linalg.norm(orthogonal_centers[SE_I0_I, :] - orthogonal_centers[SE_I1_I, :] - SHIFTS, axis=1) del SHIFTS # assemble matrix FLUXES = VOLS[inner_mask] / INNER_DISTS if self.diffusion_function is not None: FLUXES *= self.diffusion_function(centers[inner_mask], mu=mu) if self.diffusion_constant is not None: FLUXES *= self.diffusion_constant del INNER_DISTS FLUXES = np.concatenate((-FLUXES, -FLUXES, FLUXES, FLUXES)) FLUXES_I0 = np.concatenate((SE_I0_I, SE_I1_I, SE_I0_I, SE_I1_I)) FLUXES_I1 = np.concatenate((SE_I1_I, SE_I0_I, SE_I0_I, SE_I1_I)) if self.boundary_info.has_dirichlet: dirichlet_mask = self.boundary_info.dirichlet_mask(1) SE_I0_D = SE_I0[dirichlet_mask] boundary_normals = grid.unit_outer_normals()[SE_I0_D, superentity_indices[:, 0][dirichlet_mask]] BOUNDARY_DISTS = np.sum((centers[dirichlet_mask, :] - orthogonal_centers[SE_I0_D, :]) * boundary_normals, axis=-1) DIRICHLET_FLUXES = VOLS[dirichlet_mask] / BOUNDARY_DISTS if self.diffusion_function is not None: DIRICHLET_FLUXES *= self.diffusion_function(centers[dirichlet_mask], mu=mu) if self.diffusion_constant is not None: DIRICHLET_FLUXES *= self.diffusion_constant FLUXES = np.concatenate((FLUXES, DIRICHLET_FLUXES)) FLUXES_I0 = np.concatenate((FLUXES_I0, SE_I0_D)) FLUXES_I1 = np.concatenate((FLUXES_I1, SE_I0_D)) A = coo_matrix((FLUXES, (FLUXES_I0, FLUXES_I1)), shape=(self.source.dim, self.source.dim)) A = (dia_matrix(([1. / grid.volumes(0)], [0]), shape=(grid.size(0),) * 2) * A).tocsc() return A
[docs]def discretize_stationary_fv(analytical_problem, diameter=None, domain_discretizer=None, grid_type=None, num_flux='lax_friedrichs', lxf_lambda=1., eo_gausspoints=5, eo_intervals=1, grid=None, boundary_info=None, preassemble=True): """Discretizes a |StationaryProblem| using the finite volume method. Parameters ---------- analytical_problem The |StationaryProblem| to discretize. diameter If not `None`, `diameter` is passed as an argument to the `domain_discretizer`. domain_discretizer Discretizer to be used for discretizing the analytical domain. This has to be a function `domain_discretizer(domain_description, diameter, ...)`. If `None`, |discretize_domain_default| is used. grid_type If not `None`, this parameter is forwarded to `domain_discretizer` to specify the type of the generated |Grid|. num_flux The numerical flux to use in the finite volume formulation. Allowed values are `'lax_friedrichs'`, `'engquist_osher'`, `'simplified_engquist_osher'` (see :mod:`pymor.discretizers.builtin.fv`). lxf_lambda The stabilization parameter for the Lax-Friedrichs numerical flux (ignored, if different flux is chosen). eo_gausspoints Number of Gauss points for the Engquist-Osher numerical flux (ignored, if different flux is chosen). eo_intervals Number of sub-intervals to use for integration when using Engquist-Osher numerical flux (ignored, if different flux is chosen). grid Instead of using a domain discretizer, the |Grid| can also be passed directly using this parameter. boundary_info A |BoundaryInfo| specifying the boundary types of the grid boundary entities. Must be provided if `grid` is specified. preassemble If `True`, preassemble all operators in the resulting |Model|. Returns ------- m The |Model| that has been generated. data Dictionary with the following entries: :grid: The generated |Grid|. :boundary_info: The generated |BoundaryInfo|. :unassembled_m: In case `preassemble` is `True`, the generated |Model| before preassembling operators. """ assert isinstance(analytical_problem, StationaryProblem) assert grid is None or boundary_info is not None assert boundary_info is None or grid is not None assert grid is None or domain_discretizer is None assert grid_type is None or grid is None p = analytical_problem if analytical_problem.robin_data is not None: raise NotImplementedError if p.outputs: raise NotImplementedError if grid is None: domain_discretizer = domain_discretizer or discretize_domain_default if grid_type: domain_discretizer = partial(domain_discretizer, grid_type=grid_type) if diameter is None: grid, boundary_info = domain_discretizer(analytical_problem.domain) else: grid, boundary_info = domain_discretizer(analytical_problem.domain, diameter=diameter) L, L_coefficients = [], [] F, F_coefficients = [], [] if p.rhs is not None or p.neumann_data is not None: F += [L2ProductFunctional(grid, p.rhs, boundary_info=boundary_info, neumann_data=p.neumann_data)] F_coefficients += [1.] # diffusion part if isinstance(p.diffusion, LincombFunction): L += [DiffusionOperator(grid, boundary_info, diffusion_function=df, name=f'diffusion_{i}') for i, df in enumerate(p.diffusion.functions)] L_coefficients += p.diffusion.coefficients if p.dirichlet_data is not None: F += [L2ProductFunctional(grid, None, boundary_info=boundary_info, dirichlet_data=p.dirichlet_data, diffusion_function=df, name=f'dirichlet_{i}') for i, df in enumerate(p.diffusion.functions)] F_coefficients += p.diffusion.coefficients elif p.diffusion is not None: L += [DiffusionOperator(grid, boundary_info, diffusion_function=p.diffusion, name='diffusion')] L_coefficients += [1.] if p.dirichlet_data is not None: F += [L2ProductFunctional(grid, None, boundary_info=boundary_info, dirichlet_data=p.dirichlet_data, diffusion_function=p.diffusion, name='dirichlet')] F_coefficients += [1.] # advection part if isinstance(p.advection, LincombFunction): L += [LinearAdvectionLaxFriedrichs(grid, boundary_info, af, name=f'advection_{i}') for i, af in enumerate(p.advection.functions)] L_coefficients += list(p.advection.coefficients) elif p.advection is not None: L += [LinearAdvectionLaxFriedrichs(grid, boundary_info, p.advection, name='advection')] L_coefficients.append(1.) # nonlinear advection part if p.nonlinear_advection is not None: if num_flux == 'lax_friedrichs': L += [nonlinear_advection_lax_friedrichs_operator(grid, boundary_info, p.nonlinear_advection, dirichlet_data=p.dirichlet_data, lxf_lambda=lxf_lambda)] elif num_flux == 'engquist_osher': L += [nonlinear_advection_engquist_osher_operator(grid, boundary_info, p.nonlinear_advection, p.nonlinear_advection_derivative, gausspoints=eo_gausspoints, intervals=eo_intervals, dirichlet_data=p.dirichlet_data)] elif num_flux == 'simplified_engquist_osher': L += [nonlinear_advection_simplified_engquist_osher_operator(grid, boundary_info, p.nonlinear_advection, p.nonlinear_advection_derivative, dirichlet_data=p.dirichlet_data)] else: raise NotImplementedError L_coefficients.append(1.) # reaction part if isinstance(p.reaction, LincombFunction): raise NotImplementedError elif p.reaction is not None: L += [ReactionOperator(grid, p.reaction, name='reaction')] L_coefficients += [1.] # nonlinear reaction part if p.nonlinear_reaction is not None: L += [NonlinearReactionOperator(grid, p.nonlinear_reaction, p.nonlinear_reaction_derivative)] L_coefficients += [1.] # system operator if len(L_coefficients) == 1 and L_coefficients[0] == 1.: L = L[0] else: L = LincombOperator(operators=L, coefficients=L_coefficients, name='elliptic_operator') # rhs if len(F_coefficients) == 0: F = ZeroOperator(L.range, NumpyVectorSpace(1)) elif len(F_coefficients) == 1 and F_coefficients[0] == 1.: F = F[0] else: F = LincombOperator(operators=F, coefficients=F_coefficients, name='rhs') if grid.reference_element in (triangle, square): visualizer = PatchVisualizer(grid=grid, bounding_box=grid.bounding_box(), codim=0) elif grid.reference_element is line: visualizer = OnedVisualizer(grid=grid, codim=0) else: visualizer = None l2_product = L2Product(grid, name='l2') products = {'l2': l2_product} m = StationaryModel(L, F, products=products, visualizer=visualizer, name=f'{p.name}_FV') data = {'grid': grid, 'boundary_info': boundary_info} if preassemble: data['unassembled_m'] = m m = preassemble_(m) return m, data
[docs]def discretize_instationary_fv(analytical_problem, diameter=None, domain_discretizer=None, grid_type=None, num_flux='lax_friedrichs', lxf_lambda=1., eo_gausspoints=5, eo_intervals=1, grid=None, boundary_info=None, num_values=None, time_stepper=None, nt=None, preassemble=True): """Discretizes an |InstationaryProblem| with a |StationaryProblem| as stationary part using the finite volume method. Parameters ---------- analytical_problem The |InstationaryProblem| to discretize. diameter If not `None`, `diameter` is passed to the `domain_discretizer`. domain_discretizer Discretizer to be used for discretizing the analytical domain. This has to be a function `domain_discretizer(domain_description, diameter, ...)`. If further arguments should be passed to the discretizer, use :func:`functools.partial`. If `None`, |discretize_domain_default| is used. grid_type If not `None`, this parameter is forwarded to `domain_discretizer` to specify the type of the generated |Grid|. num_flux The numerical flux to use in the finite volume formulation. Allowed values are `'lax_friedrichs'`, `'engquist_osher'`, `'simplified_engquist_osher'` (see :mod:`pymor.discretizers.builtin.fv`). lxf_lambda The stabilization parameter for the Lax-Friedrichs numerical flux (ignored, if different flux is chosen). eo_gausspoints Number of Gauss points for the Engquist-Osher numerical flux (ignored, if different flux is chosen). eo_intervals Number of sub-intervals to use for integration when using Engquist-Osher numerical flux (ignored, if different flux is chosen). grid Instead of using a domain discretizer, the |Grid| can also be passed directly using this parameter. boundary_info A |BoundaryInfo| specifying the boundary types of the grid boundary entities. Must be provided if `grid` is specified. num_values The number of returned vectors of the solution trajectory. If `None`, each intermediate vector that is calculated is returned. time_stepper The :class:`time-stepper <pymor.algorithms.timestepping.TimeStepper>` to be used by :class:`~pymor.models.basic.InstationaryModel.solve`. nt If `time_stepper` is not specified, the number of time steps for implicit Euler time stepping. preassemble If `True`, preassemble all operators in the resulting |Model|. Returns ------- m The |Model| that has been generated. data Dictionary with the following entries: :grid: The generated |Grid|. :boundary_info: The generated |BoundaryInfo|. :unassembled_m: In case `preassemble` is `True`, the generated |Model| before preassembling operators. """ assert isinstance(analytical_problem, InstationaryProblem) assert isinstance(analytical_problem.stationary_part, StationaryProblem) assert grid is None or boundary_info is not None assert boundary_info is None or grid is not None assert grid is None or domain_discretizer is None assert (time_stepper is None) != (nt is None) p = analytical_problem m, data = discretize_stationary_fv(p.stationary_part, diameter=diameter, domain_discretizer=domain_discretizer, grid_type=grid_type, num_flux=num_flux, lxf_lambda=lxf_lambda, eo_gausspoints=eo_gausspoints, eo_intervals=eo_intervals, grid=grid, boundary_info=boundary_info) grid = data['grid'] if p.initial_data.parametric: def initial_projection(U, mu): I = p.initial_data.evaluate(grid.quadrature_points(0, order=2), mu).squeeze() I = np.sum(I * grid.reference_element.quadrature(order=2)[1], axis=1) * (1. / grid.reference_element.volume) I = m.solution_space.make_array(I) return I.lincomb(U).to_numpy() I = NumpyGenericOperator(initial_projection, dim_range=grid.size(0), linear=True, range_id=m.solution_space.id, parameters=p.initial_data.parameters) else: I = p.initial_data.evaluate(grid.quadrature_points(0, order=2)).squeeze() I = np.sum(I * grid.reference_element.quadrature(order=2)[1], axis=1) * (1. / grid.reference_element.volume) I = m.solution_space.make_array(I) if time_stepper is None: if p.stationary_part.diffusion is None: time_stepper = ExplicitEulerTimeStepper(nt=nt) else: time_stepper = ImplicitEulerTimeStepper(nt=nt) rhs = None if isinstance(m.rhs, ZeroOperator) else m.rhs m = InstationaryModel(operator=m.operator, rhs=rhs, mass=None, initial_data=I, T=p.T, products=m.products, time_stepper=time_stepper, visualizer=m.visualizer, num_values=num_values, name=f'{p.name}_FV') if preassemble: data['unassembled_m'] = m m = preassemble_(m) return m, data