Source code for pymor.algorithms.hapod

# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

import asyncio
from math import ceil
import numpy as np
from queue import LifoQueue

from pymor.algorithms.pod import pod
from pymor.core.base import BasicObject, abstractmethod
from pymor.core.logger import getLogger


[docs]class Tree(BasicObject): """A rooted tree.""" root = 0 @abstractmethod def children(self, node): pass @property def depth(self): def get_depth(node): children = self.children(node) if children: return 1 + max(get_depth(c) for c in children) else: return 1 return get_depth(self.root) def is_leaf(self, node): return not self.children(node)
[docs]class IncHAPODTree(Tree): def __init__(self, steps): self.steps = steps self.root = steps def children(self, node): if node < 0: return () elif node == 1: return (-1,) else: return (node - 1, -node)
[docs]class DistHAPODTree(Tree): def __init__(self, slices): self.root = slices def children(self, node): return tuple(range(self.root)) if node == self.root else ()
[docs]def default_pod_method(U, eps, is_root_node, product): return pod(U, atol=0., rtol=0., l2_err=eps, product=product, orth_tol=None if is_root_node else np.inf)
[docs]def hapod(tree, snapshots, local_eps, product=None, pod_method=default_pod_method, executor=None, eval_snapshots_in_executor=False): """Compute the Hierarchical Approximate POD. This is an implementation of the HAPOD algorithm from [HLR18]_. Parameters ---------- tree A :class:`Tree` defining the worker topology. snapshots A mapping `snapshots(node)` returning for each leaf node the associated snapshot vectors. local_eps A mapping `local_eps(node, snap_count, num_vecs)` assigning to each tree node `node` an l2 truncation error tolerance for the local pod based on the number of input vectors `num_vecs` and the total number of snapshot vectors below the given node `snap_count`. product Inner product |Operator| w.r.t. which to compute the POD. pod_method A function `pod_method(U, eps, root_node, product)` for computing the POD of the |VectorArray| `U` w.r.t. the given inner product `product` and the l2 error tolerance `eps`. `root_node` is set to `True` when the POD is computed at the root of the tree. executor If not `None`, a :class:`concurrent.futures.Executor` object to use for parallelization. eval_snapshots_in_executor If `True` also parallelize the evaluation of the snapshot map. Returns ------- modes The computed POD modes. svals The associated singular values. snap_count The total number of input snapshot vectors. """ logger = getLogger('pymor.algorithms.hapod.hapod') async def hapod_step(node): children = tree.children(node) if children: modes, svals, snap_counts = zip(*await asyncio.gather(*(hapod_step(c) for c in children))) for m, sv in zip(modes, svals): m.scal(sv) U = modes[0] for V in modes[1:]: U.append(V, remove_from_other=True) snap_count = sum(snap_counts) else: if eval_snapshots_in_executor: U = await executor.submit(snapshots, node) else: U = snapshots(node) snap_count = len(U) with logger.block(f'Processing node {node}'): eps = local_eps(node, snap_count, len(U)) if eps: modes, svals = await executor.submit(pod_method, U, eps, node == tree.root, product) return modes, svals, snap_count else: return U.copy(), np.ones(len(U)), snap_count # setup asyncio event loop loop = asyncio.get_event_loop() if loop.is_closed(): # probably we have closed the event loop ourselves in an earlier hapod call loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop_was_running = loop.is_running() # wrap Executer to ensure LIFO ordering of tasks # this ensures that PODs of parent nodes are computed as soon as all input data # is available if executor is not None: executor = LifoExecutor(executor) else: executor = FakeExecutor # perform HAPOD result = loop.run_until_complete(hapod_step(tree.root)) # shutdown event loop if not loop_was_running: # we haven't been inside a running event loop loop.close() return result
[docs]def inc_hapod(steps, snapshots, eps, omega, product=None, executor=None, eval_snapshots_in_executor=False): """Incremental Hierarchical Approximate POD. This computes the incremental HAPOD from [HLR18]_. Parameters ---------- steps The number of incremental POD updates. snapshots A mapping `snapshots(step)` returning for each incremental POD step the associated snapshot vectors. eps Desired l2-mean approximation error. omega Tuning parameter (0 < omega < 1) to balance performance with approximation quality. product Inner product |Operator| w.r.t. which to compute the POD. executor If not `None`, a :class:`concurrent.futures.Executor` object to use for parallelization. eval_snapshots_in_executor If `True` also parallelize the evaluation of the snapshot map. Returns ------- modes The computed POD modes. svals The associated singular values. snap_count The total number of input snapshot vectors. """ tree = IncHAPODTree(steps) return hapod(tree, lambda node: snapshots(-node - 1), std_local_eps(tree, eps, omega, False), product=product, executor=executor, eval_snapshots_in_executor=eval_snapshots_in_executor)
[docs]def dist_hapod(num_slices, snapshots, eps, omega, product=None, executor=None, eval_snapshots_in_executor=False): """Distributed Hierarchical Approximate POD. This computes the distributed HAPOD from [HLR18]_. Parameters ---------- num_slices The number of snapshot vector slices. snapshots A mapping `snapshots(slice)` returning for each slice number the associated snapshot vectors. eps Desired l2-mean approximation error. omega Tuning parameter (0 < omega < 1) to balance performance with approximation quality. product Inner product |Operator| w.r.t. which to compute the POD. executor If not `None`, a :class:`concurrent.futures.Executor` object to use for parallelization. eval_snapshots_in_executor If `True` also parallelize the evaluation of the snapshot map. Returns ------- modes The computed POD modes. svals The associated singular values. snap_count The total number of input snapshot vectors. """ tree = DistHAPODTree(num_slices) return hapod(tree, snapshots, std_local_eps(tree, eps, omega, True), product=product, executor=executor, eval_snapshots_in_executor=eval_snapshots_in_executor)
[docs]def inc_vectorarray_hapod(steps, U, eps, omega, product=None, executor=None): """Incremental Hierarchical Approximate POD. This computes the incremental HAPOD from [HLR18]_ for a given |VectorArray|. Parameters ---------- steps The number of incremental POD updates. U The |VectorArray| of which to compute the HAPOD. eps Desired l2-mean approximation error. omega Tuning parameter (0 < omega < 1) to balance performance with approximation quality. product Inner product |Operator| w.r.t. which to compute the POD. executor If not `None`, a :class:`concurrent.futures.Executor` object to use for parallelization. eval_snapshots_in_executor If `True` also parallelize the evaluation of the snapshot map. Returns ------- modes The computed POD modes. svals The associated singular values. snap_count The total number of input snapshot vectors. """ chunk_size = ceil(len(U) / steps) slices = range(0, len(U), chunk_size) return inc_hapod(len(slices), lambda i: U[slices[i]: slices[i]+chunk_size], eps, omega, product=product, executor=executor)
[docs]def dist_vectorarray_hapod(num_slices, U, eps, omega, product=None, executor=None): """Distributed Hierarchical Approximate POD. This computes the distributed HAPOD from [HLR18]_ of a given |VectorArray|. Parameters ---------- num_slices The number of snapshot vector slices. U The |VectorArray| of which to compute the HAPOD. eps Desired l2-mean approximation error. omega Tuning parameter (0 < omega < 1) to balance performance with approximation quality. product Inner product |Operator| w.r.t. which to compute the POD. executor If not `None`, a :class:`concurrent.futures.Executor` object to use for parallelization. Returns ------- modes The computed POD modes. svals The associated singular values. snap_count The total number of input snapshot vectors. """ chunk_size = ceil(len(U) / num_slices) slices = range(0, len(U), chunk_size) return dist_hapod(len(slices), lambda i: U[slices[i]: slices[i]+chunk_size], eps, omega, product=product, executor=executor)
[docs]def std_local_eps(tree, eps, omega, pod_on_leafs=True): L = tree.depth if pod_on_leafs else tree.depth - 1 def local_eps(node, snap_count, input_count): if node == tree.root: return np.sqrt(snap_count) * omega * eps elif not pod_on_leafs and tree.is_leaf(node): return 0. else: return np.sqrt(snap_count) / np.sqrt(L - 1) * np.sqrt(1 - omega**2) * eps return local_eps
[docs]class LifoExecutor: def __init__(self, executor, max_workers=None): self.executor = executor self.max_workers = max_workers or executor._max_workers self.queue = LifoQueue() self.loop = asyncio.get_event_loop() self.sem = asyncio.Semaphore(self.max_workers) def submit(self, f, *args): future = self.loop.create_future() self.queue.put((future, f, args)) self.loop.create_task(self.run_task()) return future async def run_task(self): await self.sem.acquire() future, f, args = self.queue.get() executor_future = self.loop.run_in_executor(self.executor, f, *args) executor_future.add_done_callback(lambda f, ff=future: self.done_callback(future, f)) def done_callback(self, future, executor_future): self.sem.release() future.set_result(executor_future.result())
[docs]class FakeExecutor: @staticmethod async def submit(f, *args): return f(*args)