# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2019 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)
""" This module provides some operators for finite volume discretizations."""
import numpy as np
from scipy.sparse import coo_matrix, csc_matrix, dia_matrix
from pymor.core.defaults import defaults
from pymor.core.interfaces import ImmutableInterface, abstractmethod
from pymor.functions.interfaces import FunctionInterface
from pymor.grids.interfaces import AffineGridWithOrthogonalCentersInterface
from pymor.grids.subgrid import SubGrid, make_sub_grid_boundary_info
from pymor.operators.basic import OperatorBase
from pymor.operators.constructions import ComponentProjection
from pymor.operators.numpy import NumpyMatrixBasedOperator, NumpyMatrixOperator
from pymor.parameters.base import Parametric
from pymor.tools.inplace import iadd_masked, isub_masked
from pymor.tools.quadratures import GaussQuadratures
from pymor.vectorarrays.numpy import NumpyVectorSpace
[docs]def FVVectorSpace(grid, id='STATE'):
return NumpyVectorSpace(grid.size(0), id)
[docs]class NumericalConvectiveFluxInterface(ImmutableInterface, Parametric):
"""Interface for numerical convective fluxes for finite volume schemes.
Numerical fluxes defined by this interfaces are functions of
the form `F(U_inner, U_outer, unit_outer_normal, edge_volume, mu)`.
The flux evaluation is vectorized and happens in two stages:
1. `evaluate_stage1` receives a |NumPy array| `U` of all values which
appear as `U_inner` or `U_outer` for all edges the flux shall be
evaluated at and returns a `tuple` of |NumPy arrays|
each of the same length as `U`.
2. `evaluate_stage2` receives the reordered `stage1_data` for each
edge as well as the unit outer normal and the volume of the edges.
`stage1_data` is given as follows: If `R_l` is `l`-th entry of the
`tuple` returned by `evaluate_stage1`, the `l`-th entry `D_l` of
of the `stage1_data` tuple has the shape `(num_edges, 2) + R_l.shape[1:]`.
If for edge `k` the values `U_inner` and `U_outer` are the `i`-th
and `j`-th value in the `U` array provided to `evaluate_stage1`,
we have ::
D_l[k, 0] == R_l[i], D_l[k, 1] == R_l[j].
`evaluate_stage2` returns a |NumPy array| of the flux evaluations
for each edge.
"""
@abstractmethod
def evaluate_stage1(self, U, mu=None):
pass
@abstractmethod
def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None):
pass
[docs]class LaxFriedrichsFlux(NumericalConvectiveFluxInterface):
"""Lax-Friedrichs numerical flux.
If `f` is the analytical flux, the Lax-Friedrichs flux `F` is given
by::
F(U_in, U_out, normal, vol) = vol * [normal⋅(f(U_in) + f(U_out))/2 + (U_in - U_out)/(2*λ)]
Parameters
----------
flux
|Function| defining the analytical flux `f`.
lxf_lambda
The stabilization parameter `λ`.
"""
def __init__(self, flux, lxf_lambda=1.0):
self.__auto_init(locals())
self.build_parameter_type(flux)
def evaluate_stage1(self, U, mu=None):
return U, self.flux(U[..., np.newaxis], mu)
def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None):
U, F = stage1_data
return (np.sum(np.sum(F, axis=1) * unit_outer_normals, axis=1) * 0.5
+ (U[..., 0] - U[..., 1]) * (0.5 / self.lxf_lambda)) * volumes
[docs]class SimplifiedEngquistOsherFlux(NumericalConvectiveFluxInterface):
"""Engquist-Osher numerical flux. Simplified Implementation for special case.
For the definition of the Engquist-Osher flux see :class:`EngquistOsherFlux`.
This class provides a faster and more accurate implementation for the special
case that `f(0) == 0` and the derivative of `f` only changes sign at `0`.
Parameters
----------
flux
|Function| defining the analytical flux `f`.
flux_derivative
|Function| defining the analytical flux derivative `f'`.
"""
def __init__(self, flux, flux_derivative):
self.__auto_init(locals())
self.build_parameter_type(flux, flux_derivative)
def evaluate_stage1(self, U, mu=None):
return self.flux(U[..., np.newaxis], mu), self.flux_derivative(U[..., np.newaxis], mu)
def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None):
F_edge, F_d_edge = stage1_data
unit_outer_normals = unit_outer_normals[:, np.newaxis, :]
F_d_edge = np.sum(F_d_edge * unit_outer_normals, axis=2)
F_edge = np.sum(F_edge * unit_outer_normals, axis=2)
F_edge[:, 0] = np.where(np.greater_equal(F_d_edge[:, 0], 0), F_edge[:, 0], 0)
F_edge[:, 1] = np.where(np.less_equal(F_d_edge[:, 1], 0), F_edge[:, 1], 0)
F_edge = np.sum(F_edge, axis=1)
F_edge *= volumes
return F_edge
[docs]class EngquistOsherFlux(NumericalConvectiveFluxInterface):
"""Engquist-Osher numerical flux.
If `f` is the analytical flux, and `f'` its derivative, the Engquist-Osher flux is
given by::
F(U_in, U_out, normal, vol) = vol * [c^+(U_in, normal) + c^-(U_out, normal)]
U_in
c^+(U_in, normal) = f(0)⋅normal + ∫ max(f'(s)⋅normal, 0) ds
s=0
U_out
c^-(U_out, normal) = ∫ min(f'(s)⋅normal, 0) ds
s=0
Parameters
----------
flux
|Function| defining the analytical flux `f`.
flux_derivative
|Function| defining the analytical flux derivative `f'`.
gausspoints
Number of Gauss quadrature points to be used for integration.
intervals
Number of subintervals to be used for integration.
"""
def __init__(self, flux, flux_derivative, gausspoints=5, intervals=1):
self.__auto_init(locals())
self.build_parameter_type(flux, flux_derivative)
points, weights = GaussQuadratures.quadrature(npoints=self.gausspoints)
points = points / intervals
points = ((np.arange(self.intervals, dtype=np.float)[:, np.newaxis] * (1 / intervals))
+ points[np.newaxis, :]).ravel()
weights = np.tile(weights, intervals) * (1 / intervals)
self.points = points
self.weights = weights
def evaluate_stage1(self, U, mu=None):
int_els = np.abs(U)[:, np.newaxis, np.newaxis]
return [np.concatenate([self.flux_derivative(U[:, np.newaxis] * p, mu)[:, np.newaxis, :] * int_els * w
for p, w in zip(self.points, self.weights)], axis=1)]
def evaluate_stage2(self, stage1_data, unit_outer_normals, volumes, mu=None):
F0 = np.sum(self.flux.evaluate(np.array([[0.]]), mu=mu) * unit_outer_normals, axis=1)
Fs = np.sum(stage1_data[0] * unit_outer_normals[:, np.newaxis, np.newaxis, :], axis=3)
Fs[:, 0, :] = np.maximum(Fs[:, 0, :], 0)
Fs[:, 1, :] = np.minimum(Fs[:, 1, :], 0)
Fs = np.sum(np.sum(Fs, axis=2), axis=1) + F0
Fs *= volumes
return Fs
[docs]@defaults('delta')
def jacobian_options(delta=1e-7):
return {'delta': delta}
[docs]class NonlinearAdvectionOperator(OperatorBase):
"""Nonlinear finite volume advection |Operator|.
The operator is of the form ::
L(u, mu)(x) = ∇ ⋅ f(u(x), mu)
Parameters
----------
grid
|Grid| for which to evaluate the operator.
boundary_info
|BoundaryInfo| determining the Dirichlet and Neumann boundaries.
numerical_flux
The :class:`NumericalConvectiveFlux <NumericalConvectiveFluxInterface>` to use.
dirichlet_data
|Function| providing the Dirichlet boundary values. If `None`, constant-zero
boundary is assumed.
solver_options
The |solver_options| for the operator.
name
The name of the operator.
"""
sid_ignore = OperatorBase.sid_ignore | {'_grid_data'}
linear = False
def __init__(self, grid, boundary_info, numerical_flux, dirichlet_data=None, solver_options=None,
space_id='STATE', name=None):
assert dirichlet_data is None or isinstance(dirichlet_data, FunctionInterface)
self.__auto_init(locals())
if (isinstance(dirichlet_data, FunctionInterface) and boundary_info.has_dirichlet
and not dirichlet_data.parametric):
self._dirichlet_values = self.dirichlet_data(grid.centers(1)[boundary_info.dirichlet_boundaries(1)])
self._dirichlet_values = self._dirichlet_values.ravel()
self._dirichlet_values_flux_shaped = self._dirichlet_values.reshape((-1, 1))
self.source = self.range = FVVectorSpace(grid, space_id)
self.build_parameter_type(numerical_flux, dirichlet_data)
def with_numerical_flux(self, **kwargs):
return self.with_(numerical_flux=self.numerical_flux.with_(**kwargs))
[docs] def restricted(self, dofs):
source_dofs = np.setdiff1d(np.union1d(self.grid.neighbours(0, 0)[dofs].ravel(), dofs),
np.array([-1], dtype=np.int32),
assume_unique=True)
sub_grid = SubGrid(self.grid, source_dofs)
sub_boundary_info = make_sub_grid_boundary_info(sub_grid, self.grid, self.boundary_info)
op = self.with_(grid=sub_grid, boundary_info=sub_boundary_info, space_id=None,
name=f'{self.name}_restricted')
sub_grid_indices = sub_grid.indices_from_parent_indices(dofs, codim=0)
proj = ComponentProjection(sub_grid_indices, op.range)
return proj @ op, sub_grid.parent_indices(0)
def _fetch_grid_data(self):
# pre-fetch all grid-associated data to avoid searching the cache for each operator application
g = self.grid
bi = self.boundary_info
self._grid_data = dict(SUPE=g.superentities(1, 0),
SUPI=g.superentity_indices(1, 0),
VOLS0=g.volumes(0),
VOLS1=g.volumes(1),
BOUNDARIES=g.boundaries(1),
CENTERS=g.centers(1),
DIRICHLET_BOUNDARIES=bi.dirichlet_boundaries(1) if bi.has_dirichlet else None,
NEUMANN_BOUNDARIES=bi.neumann_boundaries(1) if bi.has_neumann else None)
self._grid_data.update(UNIT_OUTER_NORMALS=g.unit_outer_normals()[self._grid_data['SUPE'][:, 0],
self._grid_data['SUPI'][:, 0]])
[docs] def apply(self, U, mu=None):
assert U in self.source
mu = self.parse_parameter(mu)
if not hasattr(self, '_grid_data'):
self._fetch_grid_data()
U = U.to_numpy()
R = np.zeros((len(U), self.source.dim))
bi = self.boundary_info
gd = self._grid_data
SUPE = gd['SUPE']
VOLS0 = gd['VOLS0']
VOLS1 = gd['VOLS1']
BOUNDARIES = gd['BOUNDARIES']
CENTERS = gd['CENTERS']
DIRICHLET_BOUNDARIES = gd['DIRICHLET_BOUNDARIES']
NEUMANN_BOUNDARIES = gd['NEUMANN_BOUNDARIES']
UNIT_OUTER_NORMALS = gd['UNIT_OUTER_NORMALS']
if bi.has_dirichlet:
if hasattr(self, '_dirichlet_values'):
dirichlet_values = self._dirichlet_values
elif self.dirichlet_data is not None:
dirichlet_values = self.dirichlet_data(CENTERS[DIRICHLET_BOUNDARIES], mu=mu)
else:
dirichlet_values = np.zeros_like(DIRICHLET_BOUNDARIES)
F_dirichlet = self.numerical_flux.evaluate_stage1(dirichlet_values, mu)
for i, j in enumerate(range(len(U))):
Ui = U[j]
Ri = R[i]
F = self.numerical_flux.evaluate_stage1(Ui, mu)
F_edge = [f[SUPE] for f in F]
for f in F_edge:
f[BOUNDARIES, 1] = f[BOUNDARIES, 0]
if bi.has_dirichlet:
for f, f_d in zip(F_edge, F_dirichlet):
f[DIRICHLET_BOUNDARIES, 1] = f_d
NUM_FLUX = self.numerical_flux.evaluate_stage2(F_edge, UNIT_OUTER_NORMALS, VOLS1, mu)
if bi.has_neumann:
NUM_FLUX[NEUMANN_BOUNDARIES] = 0
iadd_masked(Ri, NUM_FLUX, SUPE[:, 0])
isub_masked(Ri, NUM_FLUX, SUPE[:, 1])
R /= VOLS0
return self.range.make_array(R)
[docs] def jacobian(self, U, mu=None):
assert U in self.source and len(U) == 1
mu = self.parse_parameter(mu)
if not hasattr(self, '_grid_data'):
self._fetch_grid_data()
U = U.to_numpy().ravel()
g = self.grid
bi = self.boundary_info
gd = self._grid_data
SUPE = gd['SUPE']
VOLS0 = gd['VOLS0']
VOLS1 = gd['VOLS1']
BOUNDARIES = gd['BOUNDARIES']
CENTERS = gd['CENTERS']
DIRICHLET_BOUNDARIES = gd['DIRICHLET_BOUNDARIES']
NEUMANN_BOUNDARIES = gd['NEUMANN_BOUNDARIES']
UNIT_OUTER_NORMALS = gd['UNIT_OUTER_NORMALS']
INNER = np.setdiff1d(np.arange(g.size(1)), BOUNDARIES)
solver_options = self.solver_options
delta = solver_options.get('jacobian_delta') if solver_options else None
if delta is None:
delta = jacobian_options()['delta']
if bi.has_dirichlet:
if hasattr(self, '_dirichlet_values'):
dirichlet_values = self._dirichlet_values
elif self.dirichlet_data is not None:
dirichlet_values = self.dirichlet_data(CENTERS[DIRICHLET_BOUNDARIES], mu=mu)
else:
dirichlet_values = np.zeros_like(DIRICHLET_BOUNDARIES)
F_dirichlet = self.numerical_flux.evaluate_stage1(dirichlet_values, mu)
UP = U + delta
UM = U - delta
F = self.numerical_flux.evaluate_stage1(U, mu)
FP = self.numerical_flux.evaluate_stage1(UP, mu)
FM = self.numerical_flux.evaluate_stage1(UM, mu)
del UP, UM
F_edge = [f[SUPE] for f in F]
FP_edge = [f[SUPE] for f in FP]
FM_edge = [f[SUPE] for f in FM]
del F, FP, FM
F0P_edge = [f.copy() for f in F_edge]
for f, ff in zip(F0P_edge, FP_edge):
f[:, 0] = ff[:, 0]
f[BOUNDARIES, 1] = f[BOUNDARIES, 0]
if bi.has_dirichlet:
for f, f_d in zip(F0P_edge, F_dirichlet):
f[DIRICHLET_BOUNDARIES, 1] = f_d
NUM_FLUX_0P = self.numerical_flux.evaluate_stage2(F0P_edge, UNIT_OUTER_NORMALS, VOLS1, mu)
del F0P_edge
F0M_edge = [f.copy() for f in F_edge]
for f, ff in zip(F0M_edge, FM_edge):
f[:, 0] = ff[:, 0]
f[BOUNDARIES, 1] = f[BOUNDARIES, 0]
if bi.has_dirichlet:
for f, f_d in zip(F0M_edge, F_dirichlet):
f[DIRICHLET_BOUNDARIES, 1] = f_d
NUM_FLUX_0M = self.numerical_flux.evaluate_stage2(F0M_edge, UNIT_OUTER_NORMALS, VOLS1, mu)
del F0M_edge
D_NUM_FLUX_0 = (NUM_FLUX_0P - NUM_FLUX_0M)
D_NUM_FLUX_0 /= (2 * delta)
if bi.has_neumann:
D_NUM_FLUX_0[NEUMANN_BOUNDARIES] = 0
del NUM_FLUX_0P, NUM_FLUX_0M
F1P_edge = [f.copy() for f in F_edge]
for f, ff in zip(F1P_edge, FP_edge):
f[:, 1] = ff[:, 1]
f[BOUNDARIES, 1] = f[BOUNDARIES, 0]
if bi.has_dirichlet:
for f, f_d in zip(F1P_edge, F_dirichlet):
f[DIRICHLET_BOUNDARIES, 1] = f_d
NUM_FLUX_1P = self.numerical_flux.evaluate_stage2(F1P_edge, UNIT_OUTER_NORMALS, VOLS1, mu)
del F1P_edge, FP_edge
F1M_edge = F_edge
for f, ff in zip(F1M_edge, FM_edge):
f[:, 1] = ff[:, 1]
f[BOUNDARIES, 1] = f[BOUNDARIES, 0]
if bi.has_dirichlet:
for f, f_d in zip(F1M_edge, F_dirichlet):
f[DIRICHLET_BOUNDARIES, 1] = f_d
NUM_FLUX_1M = self.numerical_flux.evaluate_stage2(F1M_edge, UNIT_OUTER_NORMALS, VOLS1, mu)
del F1M_edge, FM_edge
D_NUM_FLUX_1 = (NUM_FLUX_1P - NUM_FLUX_1M)
D_NUM_FLUX_1 /= (2 * delta)
if bi.has_neumann:
D_NUM_FLUX_1[NEUMANN_BOUNDARIES] = 0
del NUM_FLUX_1P, NUM_FLUX_1M
I1 = np.hstack([SUPE[INNER, 0], SUPE[INNER, 0], SUPE[INNER, 1], SUPE[INNER, 1], SUPE[BOUNDARIES, 0]])
I0 = np.hstack([SUPE[INNER, 0], SUPE[INNER, 1], SUPE[INNER, 0], SUPE[INNER, 1], SUPE[BOUNDARIES, 0]])
V = np.hstack([D_NUM_FLUX_0[INNER], -D_NUM_FLUX_0[INNER], D_NUM_FLUX_1[INNER], -D_NUM_FLUX_1[INNER],
D_NUM_FLUX_0[BOUNDARIES]])
A = coo_matrix((V, (I0, I1)), shape=(g.size(0), g.size(0)))
A = csc_matrix(A).copy() # See pymor.operators.cg.DiffusionOperatorP1 for why copy() is necessary
A = dia_matrix(([1. / VOLS0], [0]), shape=(g.size(0),) * 2) * A
return NumpyMatrixOperator(A, source_id=self.source.id, range_id=self.range.id)
[docs]def nonlinear_advection_lax_friedrichs_operator(grid, boundary_info, flux, lxf_lambda=1.0,
dirichlet_data=None, solver_options=None, name=None):
"""Instantiate a :class:`NonlinearAdvectionOperator` using :class:`LaxFriedrichsFlux`."""
num_flux = LaxFriedrichsFlux(flux, lxf_lambda)
return NonlinearAdvectionOperator(grid, boundary_info, num_flux, dirichlet_data, solver_options, name=name)
[docs]def nonlinear_advection_simplified_engquist_osher_operator(grid, boundary_info, flux, flux_derivative,
dirichlet_data=None, solver_options=None, name=None):
"""Instantiate a :class:`NonlinearAdvectionOperator` using :class:`SimplifiedEngquistOsherFlux`."""
num_flux = SimplifiedEngquistOsherFlux(flux, flux_derivative)
return NonlinearAdvectionOperator(grid, boundary_info, num_flux, dirichlet_data, solver_options, name=name)
[docs]def nonlinear_advection_engquist_osher_operator(grid, boundary_info, flux, flux_derivative, gausspoints=5, intervals=1,
dirichlet_data=None, solver_options=None, name=None):
"""Instantiate a :class:`NonlinearAdvectionOperator` using :class:`EngquistOsherFlux`."""
num_flux = EngquistOsherFlux(flux, flux_derivative, gausspoints=gausspoints, intervals=intervals)
return NonlinearAdvectionOperator(grid, boundary_info, num_flux, dirichlet_data, solver_options, name=name)
[docs]class LinearAdvectionLaxFriedrichs(NumpyMatrixBasedOperator):
"""Linear advection finite Volume |Operator| using Lax-Friedrichs flux.
The operator is of the form ::
L(u, mu)(x) = ∇ ⋅ (v(x, mu)⋅u(x))
See :class:`LaxFriedrichsFlux` for the definition of the Lax-Friedrichs flux.
Parameters
----------
grid
|Grid| over which to assemble the operator.
boundary_info
|BoundaryInfo| determining the Dirichlet and Neumann boundaries.
velocity_field
|Function| defining the velocity field `v`.
lxf_lambda
The stabilization parameter `λ`.
solver_options
The |solver_options| for the operator.
name
The name of the operator.
"""
def __init__(self, grid, boundary_info, velocity_field, lxf_lambda=1.0, solver_options=None, name=None):
self.__auto_init(locals())
self.source = self.range = FVVectorSpace(grid)
self.build_parameter_type(velocity_field)
def _assemble(self, mu=None):
g = self.grid
bi = self.boundary_info
SUPE = g.superentities(1, 0)
SUPI = g.superentity_indices(1, 0)
assert SUPE.ndim == 2
edge_volumes = g.volumes(1)
boundary_edges = g.boundaries(1)
inner_edges = np.setdiff1d(np.arange(g.size(1)), boundary_edges)
dirichlet_edges = bi.dirichlet_boundaries(1) if bi.has_dirichlet else np.array([], ndmin=1, dtype=np.int)
neumann_edges = bi.neumann_boundaries(1) if bi.has_neumann else np.array([], ndmin=1, dtype=np.int)
outflow_edges = np.setdiff1d(boundary_edges, np.hstack([dirichlet_edges, neumann_edges]))
normal_velocities = np.einsum('ei,ei->e',
self.velocity_field(g.centers(1), mu=mu),
g.unit_outer_normals()[SUPE[:, 0], SUPI[:, 0]])
nv_inner = normal_velocities[inner_edges]
l_inner = np.ones_like(nv_inner) * (1. / self.lxf_lambda)
I0_inner = np.hstack([SUPE[inner_edges, 0], SUPE[inner_edges, 0], SUPE[inner_edges, 1], SUPE[inner_edges, 1]])
I1_inner = np.hstack([SUPE[inner_edges, 0], SUPE[inner_edges, 1], SUPE[inner_edges, 0], SUPE[inner_edges, 1]])
V_inner = np.hstack([nv_inner, nv_inner, -nv_inner, -nv_inner])
V_inner += np.hstack([l_inner, -l_inner, -l_inner, l_inner])
V_inner *= np.tile(0.5 * edge_volumes[inner_edges], 4)
I_out = SUPE[outflow_edges, 0]
V_out = edge_volumes[outflow_edges] * normal_velocities[outflow_edges]
I_dir = SUPE[dirichlet_edges, 0]
V_dir = edge_volumes[dirichlet_edges] * (0.5 * normal_velocities[dirichlet_edges] + 0.5 / self.lxf_lambda)
I0 = np.hstack([I0_inner, I_out, I_dir])
I1 = np.hstack([I1_inner, I_out, I_dir])
V = np.hstack([V_inner, V_out, V_dir])
A = coo_matrix((V, (I0, I1)), shape=(g.size(0), g.size(0)))
A = csc_matrix(A).copy() # See pymor.operators.cg.DiffusionOperatorP1 for why copy() is necessary
A = dia_matrix(([1. / g.volumes(0)], [0]), shape=(g.size(0),) * 2) * A
return A
[docs]class L2Product(NumpyMatrixBasedOperator):
"""|Operator| representing the L2-product between finite volume functions.
Parameters
----------
grid
The |Grid| for which to assemble the product.
solver_options
The |solver_options| for the operator.
name
The name of the product.
"""
sparse = True
def __init__(self, grid, solver_options=None, name=None):
self.__auto_init(locals())
self.source = self.range = FVVectorSpace(grid)
def _assemble(self, mu=None):
A = dia_matrix((self.grid.volumes(0), [0]), shape=(self.grid.size(0),) * 2)
return A
[docs]class ReactionOperator(NumpyMatrixBasedOperator):
"""Finite Volume reaction |Operator|.
The operator is of the form ::
L(u, mu)(x) = c(x, mu)⋅u(x)
Parameters
----------
grid
The |Grid| for which to assemble the operator.
reaction_coefficient
The function 'c'
solver_options
The |solver_options| for the operator.
name
The name of the operator.
"""
sparse = True
def __init__(self, grid, reaction_coefficient, solver_options=None, name=None):
assert reaction_coefficient.dim_domain == grid.dim and reaction_coefficient.shape_range == ()
self.__auto_init(locals())
self.source = self.range = FVVectorSpace(grid)
self.build_parameter_type(reaction_coefficient)
def _assemble(self, mu=None):
A = dia_matrix((self.reaction_coefficient.evaluate(self.grid.centers(0), mu=mu), [0]),
shape=(self.grid.size(0),) * 2)
return A
[docs]class NonlinearReactionOperator(OperatorBase):
linear = False
def __init__(self, grid, reaction_function, reaction_function_derivative=None, space_id='STATE', name=None):
self.__auto_init(locals())
self.source = self.range = FVVectorSpace(grid, space_id)
self.build_parameter_type(reaction_function, reaction_function_derivative)
[docs] def apply(self, U, ind=None, mu=None):
assert U in self.source
R = U.to_numpy() if ind is None else U.to_numpy()[ind]
R = self.reaction_function.evaluate(R.reshape(R.shape + (1,)), mu=mu)
return self.range.make_array(R)
[docs] def jacobian(self, U, mu=None):
if self.reaction_function_derivative is None:
raise NotImplementedError
U = U.to_numpy()
A = dia_matrix((self.reaction_function_derivative.evaluate(U.reshape(U.shape + (1,)), mu=mu), [0]),
shape=(self.grid.size(0),) * 2)
return NumpyMatrixOperator(A, source_id=self.source.id, range_id=self.range.id)
[docs]class L2ProductFunctional(NumpyMatrixBasedOperator):
"""Finite volume functional representing the inner product with an L2-|Function|.
Additionally, boundary conditions can be enforced by providing `dirichlet_data`
and `neumann_data` functions.
Parameters
----------
grid
|Grid| for which to assemble the functional.
function
The |Function| with which to take the inner product or `None`.
boundary_info
|BoundaryInfo| determining the Dirichlet and Neumann boundaries or `None`.
If `None`, no boundary treatment is performed.
dirichlet_data
|Function| providing the Dirichlet boundary values. If `None`,
constant-zero boundary is assumed.
diffusion_function
See :class:`DiffusionOperator`. Has to be specified in case `dirichlet_data`
is given.
diffusion_constant
See :class:`DiffusionOperator`. Has to be specified in case `dirichlet_data`
is given.
neumann_data
|Function| providing the Neumann boundary values. If `None`,
constant-zero is assumed.
order
Order of the Gauss quadrature to use for numerical integration.
name
The name of the functional.
"""
source = NumpyVectorSpace(1)
sparse = False
def __init__(self, grid, function=None, boundary_info=None, dirichlet_data=None, diffusion_function=None,
diffusion_constant=None, neumann_data=None, order=1, name=None):
assert function is None or function.shape_range == ()
self.__auto_init(locals())
self.range = FVVectorSpace(grid)
self.build_parameter_type(function, dirichlet_data, diffusion_function, neumann_data)
def _assemble(self, mu=None):
g = self.grid
bi = self.boundary_info
if self.function is not None:
# evaluate function at all quadrature points -> shape = (g.size(0), number of quadrature points, 1)
F = self.function(g.quadrature_points(0, order=self.order), mu=mu)
_, w = g.reference_element.quadrature(order=self.order)
# integrate the products of the function with the shape functions on each element
# -> shape = (g.size(0), number of shape functions)
F_INTS = np.einsum('ei,e,i->e', F, g.integration_elements(0), w).ravel()
else:
F_INTS = np.zeros(g.size(0))
if bi is not None and (bi.has_dirichlet and self.dirichlet_data is not None
or bi.has_neumann and self.neumann_data):
centers = g.centers(1)
superentities = g.superentities(1, 0)
superentity_indices = g.superentity_indices(1, 0)
SE_I0 = superentities[:, 0]
VOLS = g.volumes(1)
FLUXES = np.zeros(g.size(1))
if bi.has_dirichlet and self.dirichlet_data is not None:
dirichlet_mask = bi.dirichlet_mask(1)
SE_I0_D = SE_I0[dirichlet_mask]
boundary_normals = g.unit_outer_normals()[SE_I0_D, superentity_indices[:, 0][dirichlet_mask]]
BOUNDARY_DISTS = np.sum((centers[dirichlet_mask, :] - g.orthogonal_centers()[SE_I0_D, :])
* boundary_normals,
axis=-1)
DIRICHLET_FLUXES = VOLS[dirichlet_mask] * self.dirichlet_data(centers[dirichlet_mask], mu=mu) / BOUNDARY_DISTS
if self.diffusion_function is not None:
DIRICHLET_FLUXES *= self.diffusion_function(centers[dirichlet_mask], mu=mu)
if self.diffusion_constant is not None:
DIRICHLET_FLUXES *= self.diffusion_constant
FLUXES[dirichlet_mask] = DIRICHLET_FLUXES
if bi.has_neumann and self.neumann_data is not None:
neumann_mask = bi.neumann_mask(1)
FLUXES[neumann_mask] -= VOLS[neumann_mask] * self.neumann_data(centers[neumann_mask], mu=mu)
F_INTS += np.bincount(SE_I0, weights=FLUXES, minlength=len(F_INTS))
F_INTS /= g.volumes(0)
return F_INTS.reshape((-1, 1))
[docs]class DiffusionOperator(NumpyMatrixBasedOperator):
"""Finite Volume Diffusion |Operator|.
The operator is of the form ::
(Lu)(x) = c ∇ ⋅ [ d(x) ∇ u(x) ]
Parameters
----------
grid
The |Grid| over which to assemble the operator.
boundary_info
|BoundaryInfo| for the treatment of Dirichlet boundary conditions.
diffusion_function
The scalar-valued |Function| `d(x)`. If `None`, constant one is assumed.
diffusion_constant
The constant `c`. If `None`, `c` is set to one.
solver_options
The |solver_options| for the operator.
name
Name of the operator.
"""
sparse = True
def __init__(self, grid, boundary_info, diffusion_function=None, diffusion_constant=None, solver_options=None,
name=None):
super().__init__()
assert isinstance(grid, AffineGridWithOrthogonalCentersInterface)
assert (diffusion_function is None
or (isinstance(diffusion_function, FunctionInterface)
and diffusion_function.dim_domain == grid.dim
and diffusion_function.shape_range == ()))
self.__auto_init(locals())
self.source = self.range = FVVectorSpace(grid)
if diffusion_function is not None:
self.build_parameter_type(diffusion_function)
def _assemble(self, mu=None):
grid = self.grid
# compute the local coordinates of the codim-1 subentity centers in the reference element
reference_element = grid.reference_element(0)
subentity_embedding = reference_element.subentity_embedding(1)
subentity_centers = (np.einsum('eij,j->ei',
subentity_embedding[0], reference_element.sub_reference_element(1).center())
+ subentity_embedding[1])
# compute shift for periodic boundaries
embeddings = grid.embeddings(0)
superentities = grid.superentities(1, 0)
superentity_indices = grid.superentity_indices(1, 0)
boundary_mask = grid.boundary_mask(1)
inner_mask = ~boundary_mask
SE_I0 = superentities[:, 0]
SE_I1 = superentities[:, 1]
SE_I0_I = SE_I0[inner_mask]
SE_I1_I = SE_I1[inner_mask]
SHIFTS = (np.einsum('eij,ej->ei',
embeddings[0][SE_I0_I, :, :],
subentity_centers[superentity_indices[:, 0][inner_mask]])
+ embeddings[1][SE_I0_I, :])
SHIFTS -= (np.einsum('eij,ej->ei',
embeddings[0][SE_I1_I, :, :],
subentity_centers[superentity_indices[:, 1][inner_mask]])
+ embeddings[1][SE_I1_I, :])
# comute distances for gradient approximations
centers = grid.centers(1)
orthogonal_centers = grid.orthogonal_centers()
VOLS = grid.volumes(1)
INNER_DISTS = np.linalg.norm(orthogonal_centers[SE_I0_I, :] - orthogonal_centers[SE_I1_I, :] - SHIFTS,
axis=1)
del SHIFTS
# assemble matrix
FLUXES = VOLS[inner_mask] / INNER_DISTS
if self.diffusion_function is not None:
FLUXES *= self.diffusion_function(centers[inner_mask], mu=mu)
if self.diffusion_constant is not None:
FLUXES *= self.diffusion_constant
del INNER_DISTS
FLUXES = np.concatenate((-FLUXES, -FLUXES, FLUXES, FLUXES))
FLUXES_I0 = np.concatenate((SE_I0_I, SE_I1_I, SE_I0_I, SE_I1_I))
FLUXES_I1 = np.concatenate((SE_I1_I, SE_I0_I, SE_I0_I, SE_I1_I))
if self.boundary_info.has_dirichlet:
dirichlet_mask = self.boundary_info.dirichlet_mask(1)
SE_I0_D = SE_I0[dirichlet_mask]
boundary_normals = grid.unit_outer_normals()[SE_I0_D, superentity_indices[:, 0][dirichlet_mask]]
BOUNDARY_DISTS = np.sum((centers[dirichlet_mask, :] - orthogonal_centers[SE_I0_D, :]) * boundary_normals,
axis=-1)
DIRICHLET_FLUXES = VOLS[dirichlet_mask] / BOUNDARY_DISTS
if self.diffusion_function is not None:
DIRICHLET_FLUXES *= self.diffusion_function(centers[dirichlet_mask], mu=mu)
if self.diffusion_constant is not None:
DIRICHLET_FLUXES *= self.diffusion_constant
FLUXES = np.concatenate((FLUXES, DIRICHLET_FLUXES))
FLUXES_I0 = np.concatenate((FLUXES_I0, SE_I0_D))
FLUXES_I1 = np.concatenate((FLUXES_I1, SE_I0_D))
A = coo_matrix((FLUXES, (FLUXES_I0, FLUXES_I1)), shape=(self.source.dim, self.source.dim))
A = (dia_matrix(([1. / grid.volumes(0)], [0]), shape=(grid.size(0),) * 2) * A).tocsc()
return A