# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)
from itertools import chain
import os
import time
from pymor.core.base import BasicObject
from pymor.core.config import config
from pymor.parallel.basic import WorkerPoolBase
from pymor.tools.counter import Counter
if config.HAVE_IPYTHON:
try:
from ipyparallel import Client, TimeoutError
except ImportError:
from IPython.parallel import Client, TimeoutError
[docs]class new_ipcluster_pool(BasicObject):
"""Create a new IPython parallel cluster and connect to it.
This context manager can be used to create an :class:`IPythonPool`
|WorkerPool|. When entering the context a new IPython cluster is
created using the `ipcluster` script and an :class:`IPythonPool`
is instantiated for the newly created cluster. When leaving
the context the cluster is shut down.
Parameters
----------
profile
Passed as `--profile` parameter to the `ipcluster` script.
cluster_id
Passed as `--cluster-id` parameter to the `ipcluster` script.
nun_engines
Passed as `--n` parameter to the `ipcluster` script.
ipython_dir
Passed as `--ipython-dir` parameter to the `ipcluster` script.
min_wait
Wait at least this many seconds before trying to connect to the
new cluster.
timeout
Wait at most this many seconds for all Ipython cluster engines to
become available.
"""
def __init__(self, profile=None, cluster_id=None, num_engines=None, ipython_dir=None, min_wait=1, timeout=60):
self.__auto_init(locals())
def __enter__(self):
args = []
if self.profile is not None:
args.append('--profile=' + self.profile)
if self.cluster_id is not None:
args.append('--cluster-id=' + self.cluster_id)
if self.num_engines is not None:
args.append('--n=' + str(self.num_engines))
if self.ipython_dir is not None:
args.append('--ipython-dir=' + self.ipython_dir)
cmd = ' '.join(['ipcluster start --daemonize'] + args)
self.logger.info('Staring IPython cluster with "' + cmd + '"')
os.system(cmd)
num_engines, timeout = self.num_engines, self.timeout
time.sleep(self.min_wait)
waited = self.min_wait
client = None
while client is None:
try:
client = Client(profile=self.profile, cluster_id=self.cluster_id)
except (IOError, TimeoutError):
if waited >= self.timeout:
raise IOError('Could not connect to IPython cluster controller')
if waited % 10 == 0:
self.logger.info('Waiting for controller to start ...')
time.sleep(1)
waited += 1
if num_engines is None:
while len(client) == 0 and waited < timeout:
if waited % 10 == 0:
self.logger.info('Waiting for engines to start ...')
time.sleep(1)
waited += 1
if len(client) == 0:
raise IOError('IPython cluster engines failed to start')
wait = min(waited, timeout - waited)
if wait > 0:
self.logger.info(f'Waiting {wait} more seconds for engines to start ...')
time.sleep(wait)
else:
running = len(client)
while running < num_engines and waited < timeout:
if waited % 10 == 0:
self.logger.info(f'Waiting for {num_engines-running} of {num_engines} engines to start ...')
time.sleep(1)
waited += 1
running = len(client)
running = len(client)
if running < num_engines:
raise IOError(f'{num_engines-running} of {num_engines} IPython cluster engines failed to start')
# make sure all (potential) engines are in the same cwd, so they can import the same code
client[:].apply_sync(os.chdir, os.getcwd())
client.close()
self.pool = IPythonPool(profile=self.profile, cluster_id=self.cluster_id)
return self.pool
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.client.close()
args = []
if self.profile is not None:
args.append('--profile=' + self.profile)
if self.cluster_id is not None:
args.append('--cluster-id=' + self.cluster_id)
if self.ipython_dir is not None:
args.append('--ipython-dir=' + self.ipython_dir)
cmd = ' '.join(['ipcluster stop'] + args)
self.logger.info('Stopping IPython cluster with "' + cmd + '"')
os.system(cmd)
[docs]class IPythonPool(WorkerPoolBase):
"""|WorkerPool| based on the IPython parallel computing features.
Parameters
----------
num_engines
Number of IPython engines to use. If `None`, all available
engines are used.
kwargs
Keyword arguments used to instantiate the IPython cluster client.
"""
def __init__(self, num_engines=None, **kwargs):
super().__init__()
self.client = Client(**kwargs)
if num_engines is not None:
self.view = self.client[:num_engines]
else:
self.view = self.client[:]
self.logger.info(f'Connected to {len(self.view)} engines')
self.view.map_sync(_setup_worker, range(len(self.view)))
self._remote_objects_created = Counter()
[docs] def __len__(self):
return len(self.view)
def _push_object(self, obj):
remote_id = RemoteId(self._remote_objects_created.inc())
self.view.apply_sync(_push_object, remote_id, obj)
return remote_id
def _apply(self, function, *args, **kwargs):
return self.view.apply_sync(_worker_call_function, function, False, args, kwargs)
def _apply_only(self, function, worker, *args, **kwargs):
view = self.client[int(worker)]
return view.apply_sync(_worker_call_function, function, False, args, kwargs)
def _map(self, function, chunks, **kwargs):
result = self.view.map_sync(_worker_call_function,
*zip(*((function, True, a, kwargs) for a in zip(*chunks))))
return list(chain(*result))
def _remove_object(self, remote_id):
self.view.apply(_remove_object, remote_id)
[docs]class RemoteId(int):
pass
def _worker_call_function(function, loop, args, kwargs):
global _remote_objects
kwargs = {k: (_remote_objects[v] if isinstance(v, RemoteId) else # NOQA
v)
for k, v in kwargs.items()}
if loop:
return [function(*a, **kwargs) for a in zip(*args)]
else:
return function(*args, **kwargs)
def _setup_worker(worker_id):
global _remote_objects
_remote_objects = {}
# ensure that each worker starts with a different RandomState
from pymor.tools import random
import numpy as np
state = random.default_random_state()
new_state = np.random.RandomState(state.randint(0, 2**16) + worker_id)
random._default_random_state = new_state
def _push_object(remote_id, obj):
global _remote_objects
_remote_objects[remote_id] = obj # NOQA
def _remove_object(remote_id):
global _remote_objects
del _remote_objects[remote_id] # NOQA