Allow performing actions on instances in bulk

Change-Id: I9ceee57c405a2f195d6e344a5c4cfa0fcfe7556f
This commit is contained in:
Joshua Harlow 2015-07-01 15:18:36 -07:00
parent f1f488db56
commit 8ff479c46a
8 changed files with 143 additions and 24 deletions

View File

@ -206,8 +206,85 @@ class Action(object):
raise ValueError("Phase name must not be empty") raise ValueError("Phase name must not be empty")
return sh.joinpths(self.phase_dir, "%s.phases" % (phase_name)) return sh.joinpths(self.phase_dir, "%s.phases" % (phase_name))
def _run_many_phase(self, functors, group, instances, phase_name, *inv_phase_names):
"""Run a given 'functor' across all of the components, passing *all* instances to run."""
# This phase recorder will be used to check if a given component
# and action has ran in the past, if so that components action
# will not be ran again. It will also be used to mark that a given
# component has completed a phase (if that phase runs).
if not phase_name:
phase_recorder = phase.NullPhaseRecorder()
else:
phase_recorder = phase.PhaseRecorder(self._get_phase_filename(phase_name))
# These phase recorders will be used to undo other actions activities
# ie, when an install completes you want the uninstall phase to be
# removed from that actions phase file (and so on). This list will be
# used to accomplish that.
neg_phase_recs = []
if inv_phase_names:
for n in inv_phase_names:
if not n:
neg_phase_recs.append(phase.NullPhaseRecorder())
else:
neg_phase_recs.append(phase.PhaseRecorder(self._get_phase_filename(n)))
def change_activate(instance, on_off):
# Activate/deactivate a component instance and there siblings (if any)
#
# This is used when you say are looking at components
# that have been activated before your component has been.
#
# Typically this is useful for checking if a previous component
# has a shared dependency with your component and if so then there
# is no need to reinstall said dependency...
instance.activated = on_off
for (_name, sibling_instance) in instance.siblings.items():
sibling_instance.activated = on_off
def run_inverse_recorders(c_name):
for n in neg_phase_recs:
n.unmark(c_name)
# Reset all activations
for c, instance in six.iteritems(instances):
change_activate(instance, False)
# Run all components which have not been ran previously (due to phase tracking)
instances_started = utils.OrderedDict()
for c, instance in six.iteritems(instances):
if c in SPECIAL_GROUPS:
c = "%s_%s" % (c, group)
if c in phase_recorder:
LOG.debug("Skipping phase named %r for component %r since it already happened.", phase_name, c)
else:
try:
with phase_recorder.mark(c):
if functors.start:
functors.start(instance)
instances_started[c] = instance
except excp.NoTraceException:
pass
if functors.run:
results = functors.run(list(six.itervalues(instances_started)))
else:
results = [None] * len(instances_started)
instances_ran = instances_started
for i, (c, instance) in enumerate(six.iteritems(instances_ran)):
result = results[i]
try:
with phase_recorder.mark(c):
if functors.end:
functors.end(instance, result)
except excp.NoTraceException:
pass
for c, instance in six.iteritems(instances_ran):
change_activate(instance, True)
run_inverse_recorders(c)
def _run_phase(self, functors, group, instances, phase_name, *inv_phase_names): def _run_phase(self, functors, group, instances, phase_name, *inv_phase_names):
"""Run a given 'functor' across all of the components, in order.""" """Run a given 'functor' across all of the components, in order individually."""
# This phase recorder will be used to check if a given component # This phase recorder will be used to check if a given component
# and action has ran in the past, if so that components action # and action has ran in the past, if so that components action

View File

@ -67,18 +67,34 @@ class PrepareAction(action.Action):
) )
dependency_handler.package_start() dependency_handler.package_start()
removals.extend(states.reverts("package")) removals.extend(states.reverts("package"))
try: if not hasattr(dependency_handler, 'package_instances'):
self._run_phase( try:
action.PhaseFunctors( self._run_phase(
start=lambda i: LOG.info("Packaging %s.", colorizer.quote(i.name)), action.PhaseFunctors(
run=dependency_handler.package_instance, start=lambda i: LOG.info("Packaging %s.", colorizer.quote(i.name)),
end=None, run=dependency_handler.package_instance,
), end=None,
group, ),
instances, group,
"package", instances,
*removals "package",
) *removals
finally: )
dependency_handler.package_finish() finally:
dependency_handler.package_finish()
else:
try:
self._run_many_phase(
action.PhaseFunctors(
start=lambda i: LOG.info("Packaging %s.", colorizer.quote(i.name)),
run=dependency_handler.package_instances,
end=None,
),
group,
instances,
"package",
*removals
)
finally:
dependency_handler.package_finish()
prior_groups.append((group, instances)) prior_groups.append((group, instances))

View File

@ -21,6 +21,8 @@ import os
import re import re
import tarfile import tarfile
from concurrent import futures
import futurist
import six import six
from anvil import colorizer from anvil import colorizer
@ -58,6 +60,11 @@ class VenvDependencyHandler(base.DependencyHandler):
instances, opts, group, instances, opts, group,
prior_groups) prior_groups)
self.cache_dir = sh.joinpths(self.root_dir, "pip-cache") self.cache_dir = sh.joinpths(self.root_dir, "pip-cache")
self.jobs = max(0, int(opts.get('jobs', 0)))
if self.jobs >= 1:
self.executor = futurist.ThreadPoolExecutor(max_workers=self.jobs)
else:
self.executor = futurist.SynchronousExecutor()
def _venv_directory_for(self, instance): def _venv_directory_for(self, instance):
return sh.joinpths(instance.get_option('component_dir'), 'venv') return sh.joinpths(instance.get_option('component_dir'), 'venv')
@ -155,7 +162,23 @@ class VenvDependencyHandler(base.DependencyHandler):
if self._PREQ_PKGS: if self._PREQ_PKGS:
self._install_into_venv(instance, self._PREQ_PKGS) self._install_into_venv(instance, self._PREQ_PKGS)
def package_instance(self, instance): def package_instances(self, instances):
if not instances:
return []
LOG.info("Packaging %s instances using %s jobs",
len(instances), self.jobs)
fs = []
all_requires_what = self._filter_download_requires()
for instance in instances:
fs.append(self.executor.submit(self._package_instance,
instance, all_requires_what))
futures.wait(fs)
results = []
for f in fs:
results.append(f.result())
return results
def _package_instance(self, instance, all_requires_what):
if not self._is_buildable(instance): if not self._is_buildable(instance):
# Skip things that aren't python... # Skip things that aren't python...
LOG.warn("Skipping building %s (not python)", LOG.warn("Skipping building %s (not python)",
@ -172,7 +195,7 @@ class VenvDependencyHandler(base.DependencyHandler):
extra_reqs.append(pip_helper.create_requirement(p)) extra_reqs.append(pip_helper.create_requirement(p))
return extra_reqs return extra_reqs
all_requires_what = self._filter_download_requires() LOG.info("Packaging %s", colorizer.quote(instance.name))
all_requires_mapping = {} all_requires_mapping = {}
for req in all_requires_what: for req in all_requires_what:
if isinstance(req, six.string_types): if isinstance(req, six.string_types):
@ -210,10 +233,11 @@ class VenvDependencyHandler(base.DependencyHandler):
if req.key in all_requires_mapping: if req.key in all_requires_mapping:
req = all_requires_mapping[req.key] req = all_requires_mapping[req.key]
requires_what.append(req) requires_what.append(req)
utils.time_it(functools.partial(_on_finish, "Dependency installation"), what = 'installation for %s' % colorizer.quote(instance.name)
utils.time_it(functools.partial(_on_finish, "Dependency %s" % what),
self._install_into_venv, instance, self._install_into_venv, instance,
requires_what) requires_what)
utils.time_it(functools.partial(_on_finish, "Instance installation"), utils.time_it(functools.partial(_on_finish, "Instance %s" % what),
self._install_into_venv, instance, self._install_into_venv, instance,
[instance.get_option('app_dir')]) [instance.get_option('app_dir')])

View File

@ -278,6 +278,7 @@ def retry(attempts, delay, func, *args, **kwargs):
return func(*args, **kwargs) return func(*args, **kwargs)
except Exception: except Exception:
failures.append(sys.exc_info()) failures.append(sys.exc_info())
LOG.exception("Calling '%s' failed", func_name)
if attempt < max_attempts and delay > 0: if attempt < max_attempts and delay > 0:
LOG.info("Waiting %s seconds before calling '%s' again", LOG.info("Waiting %s seconds before calling '%s' again",
delay, func_name) delay, func_name)

View File

@ -62,7 +62,7 @@ load-plugins=
# W0622: Redefining id is fine. # W0622: Redefining id is fine.
# W0702: No exception type(s) specified # W0702: No exception type(s) specified
# W0703: Catching "Exception" is fine if you need it # W0703: Catching "Exception" is fine if you need it
disable=I0011,I0012,I0013,C0111,E0213,E0611,E1002,E1101,E1103,F0401,R0201,R0801,R0912,R0914,R0921,R0922,W0141,W0142,W0212,W0223,W0232,W0401,W0511,W0603,W0613,W0622,W0702,W0703 disable=I0011,I0012,I0013,C0111,E0213,E0611,E1002,E1101,E1103,F0401,R0201,R0801,R0912,R0914,R0921,R0922,W0141,W0142,W0212,W0223,W0232,W0401,W0511,W0603,W0613,W0622,W0702,W0703,C0325,C0330,W1401
[REPORTS] [REPORTS]

View File

@ -14,4 +14,6 @@ PyYAML>=3.1.0
six>=1.4.1 six>=1.4.1
termcolor termcolor
argparse argparse
futurist>=0.1.1
futures
jsonpatch>=1.1 jsonpatch>=1.1

View File

@ -1,5 +1,5 @@
pylint==0.25.2 pylint==1.4.1
hacking>=0.8.0,<0.9 hacking>=0.10.0,<0.11
mock>=1.0 mock>=1.0
nose nose
testtools>=0.9.34 testtools>=0.9.34

View File

@ -39,7 +39,7 @@ commands = bash -c "find {toxinidir} \
commands = {posargs} commands = {posargs}
[flake8] [flake8]
ignore = H102,H302,E501 ignore = H102,H302,E501,H405,H236,F812,H104,E265
builtins = _ builtins = _
exclude = .venv,.tox,dist,doc,*egg,.git,build exclude = .venv,.tox,dist,doc,*egg,.git,build
@ -49,4 +49,3 @@ detailed-errors = 1
[testenv:docs] [testenv:docs]
commands = python setup.py build_sphinx commands = python setup.py build_sphinx