Source code for pymordemos.thermalblock

#!/usr/bin/env python
# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

import sys
import time

from typer import Argument, Option, run

from pymor.algorithms.error import reduction_error_analysis
from pymor.core.pickle import dump
from pymor.parallel.default import new_parallel_pool
from pymor.tools.typer import Choices


[docs]def main( xblocks: int = Argument(..., help='Number of blocks in x direction.'), yblocks: int = Argument(..., help='Number of blocks in y direction.'), snapshots: int = Argument( ..., help='naive: ignored\n\n' 'greedy/pod: Number of training_set parameters per block ' '(in total SNAPSHOTS^(XBLOCKS * YBLOCKS) parameters).\n\n' 'adaptive_greedy: size of validation set.\n\n' ), rbsize: int = Argument(..., help='Size of the reduced basis.'), adaptive_greedy_gamma: float = Option(0.2, help='See pymor.algorithms.adaptivegreedy.'), adaptive_greedy_rho: float = Option(1.1, help='See pymor.algorithms.adaptivegreedy.'), adaptive_greedy_theta: float = Option(0., help='See pymor.algorithms.adaptivegreedy.'), alg: Choices('naive greedy adaptive_greedy pod') = Option('greedy', help='The model reduction algorithm to use.'), cache_region: Choices('none memory disk persistent') = Option( 'none', help='Name of cache region to use for caching solution snapshots.' ), extension_alg: Choices('trivial gram_schmidt') = Option( 'gram_schmidt', help='Basis extension algorithm to be used.' ), fenics: bool = Option(False, help='Use FEniCS model.'), greedy_with_error_estimator: bool = Option(True, help='Use error estimator for basis generation.'), grid: int = Option(100, help='Use grid with 4*NI*NI elements'), ipython_engines: int = Option( None, help='If positive, the number of IPython cluster engines to use for ' 'parallel greedy search. If zero, no parallelization is performed.' ), ipython_profile: str = Option(None, help='IPython profile to use for parallelization.'), list_vector_array: bool = Option( False, help='Solve using ListVectorArray[NumpyVector] instead of NumpyVectorArray.' ), order: int = Option(1, help='Polynomial order of the Lagrange finite elements to use in FEniCS.'), pickle: str = Option( None, help='Pickle reduced model, as well as reductor and high-dimensional model ' 'to files with this prefix.' ), product: Choices('euclidean h1') = Option( 'h1', help='Product w.r.t. which to orthonormalize and calculate Riesz representatives.' ), plot_err: bool = Option(False, help='Plot error'), plot_error_sequence: bool = Option(False, help='Plot reduction error vs. basis size.'), plot_solutions: bool = Option(False, help='Plot some example solutions.'), reductor: Choices('traditional residual_basis') = Option( 'residual_basis', help='Reductor (error estimator) to choose.' ), test: int = Option(10, help='Use COUNT snapshots for stochastic error estimation.'), ): """Thermalblock demo.""" if fenics and cache_region != 'none': raise ValueError('Caching of high-dimensional solutions is not supported for FEniCS model.') if not fenics and order != 1: raise ValueError('Higher-order finite elements only supported for FEniCS model.') pool = new_parallel_pool(ipython_num_engines=ipython_engines, ipython_profile=ipython_profile) if fenics: fom, fom_summary = discretize_fenics(xblocks, yblocks, grid, order) else: fom, fom_summary = discretize_pymor(xblocks, yblocks, grid, list_vector_array) parameter_space = fom.parameters.space(0.1, 1.) if cache_region != 'none': # building a cache_id is only needed for persistent CacheRegions cache_id = (f"pymordemos.thermalblock {fenics} {xblocks} {yblocks}" f"{grid} {order}") fom.enable_caching(cache_region.value, cache_id) if plot_solutions: print('Showing some solutions') Us = () legend = () for mu in parameter_space.sample_randomly(2): print(f"Solving for diffusion = \n{mu['diffusion']} ... ") sys.stdout.flush() Us = Us + (fom.solve(mu),) legend = legend + (str(mu['diffusion']),) fom.visualize(Us, legend=legend, title='Detailed Solutions for different parameters', separate_colorbars=False, block=True) print('RB generation ...') # define estimator for coercivity constant from pymor.parameters.functionals import ExpressionParameterFunctional coercivity_estimator = ExpressionParameterFunctional('min(diffusion)', fom.parameters) # inner product for computation of Riesz representatives product = fom.h1_0_semi_product if product == 'h1' else None if reductor == 'residual_basis': from pymor.reductors.coercive import CoerciveRBReductor reductor = CoerciveRBReductor(fom, product=product, coercivity_estimator=coercivity_estimator, check_orthonormality=False) elif reductor == 'traditional': from pymor.reductors.coercive import SimpleCoerciveRBReductor reductor = SimpleCoerciveRBReductor(fom, product=product, coercivity_estimator=coercivity_estimator, check_orthonormality=False) else: assert False # this should never happen if alg == 'naive': rom, red_summary = reduce_naive(fom=fom, reductor=reductor, parameter_space=parameter_space, basis_size=rbsize) elif alg == 'greedy': parallel = greedy_with_error_estimator or not fenics # cannot pickle FEniCS model rom, red_summary = reduce_greedy(fom=fom, reductor=reductor, parameter_space=parameter_space, snapshots_per_block=snapshots, extension_alg_name=extension_alg.value, max_extensions=rbsize, use_error_estimator=greedy_with_error_estimator, pool=pool if parallel else None) elif alg == 'adaptive_greedy': parallel = greedy_with_error_estimator or not fenics # cannot pickle FEniCS model rom, red_summary = reduce_adaptive_greedy(fom=fom, reductor=reductor, parameter_space=parameter_space, validation_mus=snapshots, extension_alg_name=extension_alg.value, max_extensions=rbsize, use_error_estimator=greedy_with_error_estimator, rho=adaptive_greedy_rho, gamma=adaptive_greedy_gamma, theta=adaptive_greedy_theta, pool=pool if parallel else None) elif alg == 'pod': rom, red_summary = reduce_pod(fom=fom, reductor=reductor, parameter_space=parameter_space, snapshots_per_block=snapshots, basis_size=rbsize) else: assert False # this should never happen if pickle: print(f"\nWriting reduced model to file {pickle}_reduced ...") with open(pickle + '_reduced', 'wb') as f: dump((rom, parameter_space), f) if not fenics: # FEniCS data structures do not support serialization print(f"Writing detailed model and reductor to file {pickle}_detailed ...") with open(pickle + '_detailed', 'wb') as f: dump((fom, reductor), f) print('\nSearching for maximum error on random snapshots ...') results = reduction_error_analysis(rom, fom=fom, reductor=reductor, error_estimator=True, error_norms=(fom.h1_0_semi_norm, fom.l2_norm), condition=True, test_mus=parameter_space.sample_randomly(test, seed=999), basis_sizes=0 if plot_error_sequence else 1, plot=plot_error_sequence, pool=None if fenics else pool # cannot pickle FEniCS model ) print('\n*** RESULTS ***\n') print(fom_summary) print(red_summary) print(results['summary']) sys.stdout.flush() if plot_error_sequence: import matplotlib.pyplot matplotlib.pyplot.show() if plot_err: mumax = results['max_error_mus'][0, -1] U = fom.solve(mumax) URB = reductor.reconstruct(rom.solve(mumax)) fom.visualize((U, URB, U - URB), legend=('Detailed Solution', 'Reduced Solution', 'Error'), title='Maximum Error Solution', separate_colorbars=True, block=True) global test_results test_results = results
[docs]def discretize_pymor(xblocks, yblocks, grid_num_intervals, use_list_vector_array): from pymor.analyticalproblems.thermalblock import thermal_block_problem from pymor.discretizers.builtin import discretize_stationary_cg from pymor.discretizers.builtin.list import convert_to_numpy_list_vector_array print('Discretize ...') # setup analytical problem problem = thermal_block_problem(num_blocks=(xblocks, yblocks)) # discretize using continuous finite elements fom, _ = discretize_stationary_cg(problem, diameter=1. / grid_num_intervals) if use_list_vector_array: fom = convert_to_numpy_list_vector_array(fom) summary = f'''pyMOR model: number of blocks: {xblocks}x{yblocks} grid intervals: {grid_num_intervals} ListVectorArray: {use_list_vector_array} ''' return fom, summary
[docs]def discretize_fenics(xblocks, yblocks, grid_num_intervals, element_order): from pymor.tools import mpi if mpi.parallel: from pymor.models.mpi import mpi_wrap_model fom = mpi_wrap_model(lambda: _discretize_fenics(xblocks, yblocks, grid_num_intervals, element_order), use_with=True, pickle_local_spaces=False) else: fom = _discretize_fenics(xblocks, yblocks, grid_num_intervals, element_order) summary = f'''FEniCS model: number of blocks: {xblocks}x{yblocks} grid intervals: {grid_num_intervals} finite element order: {element_order} ''' return fom, summary
def _discretize_fenics(xblocks, yblocks, grid_num_intervals, element_order): # assemble system matrices - FEniCS code ######################################## import dolfin as df mesh = df.UnitSquareMesh(grid_num_intervals, grid_num_intervals, 'crossed') V = df.FunctionSpace(mesh, 'Lagrange', element_order) u = df.TrialFunction(V) v = df.TestFunction(V) diffusion = df.Expression('(lower0 <= x[0]) * (open0 ? (x[0] < upper0) : (x[0] <= upper0)) *' '(lower1 <= x[1]) * (open1 ? (x[1] < upper1) : (x[1] <= upper1))', lower0=0., upper0=0., open0=0, lower1=0., upper1=0., open1=0, element=df.FunctionSpace(mesh, 'DG', 0).ufl_element()) def assemble_matrix(x, y, nx, ny): diffusion.user_parameters['lower0'] = x/nx diffusion.user_parameters['lower1'] = y/ny diffusion.user_parameters['upper0'] = (x + 1)/nx diffusion.user_parameters['upper1'] = (y + 1)/ny diffusion.user_parameters['open0'] = (x + 1 == nx) diffusion.user_parameters['open1'] = (y + 1 == ny) return df.assemble(df.inner(diffusion * df.nabla_grad(u), df.nabla_grad(v)) * df.dx) mats = [assemble_matrix(x, y, xblocks, yblocks) for x in range(xblocks) for y in range(yblocks)] mat0 = mats[0].copy() mat0.zero() h1_mat = df.assemble(df.inner(df.nabla_grad(u), df.nabla_grad(v)) * df.dx) l2_mat = df.assemble(u * v * df.dx) f = df.Constant(1.) * v * df.dx F = df.assemble(f) bc = df.DirichletBC(V, 0., df.DomainBoundary()) for m in mats: bc.zero(m) bc.apply(mat0) bc.apply(h1_mat) bc.apply(F) # wrap everything as a pyMOR model ################################## # FEniCS wrappers from pymor.bindings.fenics import FenicsVectorSpace, FenicsMatrixOperator, FenicsVisualizer # generic pyMOR classes from pymor.models.basic import StationaryModel from pymor.operators.constructions import LincombOperator, VectorOperator from pymor.parameters.functionals import ProjectionParameterFunctional # define parameter functionals (same as in pymor.analyticalproblems.thermalblock) def parameter_functional_factory(x, y): return ProjectionParameterFunctional('diffusion', size=yblocks*xblocks, index=yblocks - y - 1 + x * yblocks, name=f'diffusion_{x}_{y}') parameter_functionals = tuple(parameter_functional_factory(x, y) for x in range(xblocks) for y in range(yblocks)) # wrap operators ops = [FenicsMatrixOperator(mat0, V, V)] + [FenicsMatrixOperator(m, V, V) for m in mats] op = LincombOperator(ops, (1.,) + parameter_functionals) rhs = VectorOperator(FenicsVectorSpace(V).make_array([F])) h1_product = FenicsMatrixOperator(h1_mat, V, V, name='h1_0_semi') l2_product = FenicsMatrixOperator(l2_mat, V, V, name='l2') # build model visualizer = FenicsVisualizer(FenicsVectorSpace(V)) fom = StationaryModel(op, rhs, products={'h1_0_semi': h1_product, 'l2': l2_product}, visualizer=visualizer) return fom
[docs]def reduce_naive(fom, reductor, parameter_space, basis_size): tic = time.perf_counter() training_set = parameter_space.sample_randomly(basis_size) for mu in training_set: reductor.extend_basis(fom.solve(mu), method='trivial') rom = reductor.reduce() elapsed_time = time.perf_counter() - tic summary = f'''Naive basis generation: basis size set: {basis_size} elapsed time: {elapsed_time} ''' return rom, summary
[docs]def reduce_greedy(fom, reductor, parameter_space, snapshots_per_block, extension_alg_name, max_extensions, use_error_estimator, pool): from pymor.algorithms.greedy import rb_greedy # run greedy training_set = parameter_space.sample_uniformly(snapshots_per_block) greedy_data = rb_greedy(fom, reductor, training_set, use_error_estimator=use_error_estimator, error_norm=fom.h1_0_semi_norm, extension_params={'method': extension_alg_name}, max_extensions=max_extensions, pool=pool) rom = greedy_data['rom'] # generate summary real_rb_size = rom.solution_space.dim training_set_size = len(training_set) summary = f'''Greedy basis generation: size of training set: {training_set_size} error estimator used: {use_error_estimator} extension method: {extension_alg_name} prescribed basis size: {max_extensions} actual basis size: {real_rb_size} elapsed time: {greedy_data["time"]} ''' return rom, summary
[docs]def reduce_adaptive_greedy(fom, reductor, parameter_space, validation_mus, extension_alg_name, max_extensions, use_error_estimator, rho, gamma, theta, pool): from pymor.algorithms.adaptivegreedy import rb_adaptive_greedy # run greedy greedy_data = rb_adaptive_greedy(fom, reductor, parameter_space, validation_mus=-validation_mus, use_error_estimator=use_error_estimator, error_norm=fom.h1_0_semi_norm, extension_params={'method': extension_alg_name}, max_extensions=max_extensions, rho=rho, gamma=gamma, theta=theta, pool=pool) rom = greedy_data['rom'] # generate summary real_rb_size = rom.solution_space.dim # the validation set consists of `validation_mus` random parameters plus the centers of the adaptive sample set cells validation_mus += 1 summary = f'''Adaptive greedy basis generation: initial size of validation set: {validation_mus} error estimator used: {use_error_estimator} extension method: {extension_alg_name} prescribed basis size: {max_extensions} actual basis size: {real_rb_size} elapsed time: {greedy_data["time"]} ''' return rom, summary
[docs]def reduce_pod(fom, reductor, parameter_space, snapshots_per_block, basis_size): from pymor.algorithms.pod import pod tic = time.perf_counter() training_set = parameter_space.sample_uniformly(snapshots_per_block) print('Solving on training set ...') snapshots = fom.operator.source.empty(reserve=len(training_set)) for mu in training_set: snapshots.append(fom.solve(mu)) print('Performing POD ...') basis, singular_values = pod(snapshots, modes=basis_size, product=reductor.products['RB']) print('Reducing ...') reductor.extend_basis(basis, method='trivial') rom = reductor.reduce() elapsed_time = time.perf_counter() - tic # generate summary real_rb_size = rom.solution_space.dim training_set_size = len(training_set) summary = f'''POD basis generation: size of training set: {training_set_size} prescribed basis size: {basis_size} actual basis size: {real_rb_size} elapsed time: {elapsed_time} ''' return rom, summary
if __name__ == '__main__': run(main)