Do not use VolumeTasks in AWS Instance create
Use volume progress objects instead (as in volume attachment resources). Module volume_tasks and class scheduler.PollingTaskGroup have been removed, as they are no longer used anywhere. Change-Id: Iff3b92f28d89ad1fdfd22511bb531be463855ccd Partial-Bug: #1393268
This commit is contained in:
parent
4d910cbaa2
commit
4f9371cfc0
@ -21,12 +21,12 @@ from heat.common import exception
|
||||
from heat.common.i18n import _
|
||||
from heat.common.i18n import _LI
|
||||
from heat.engine import attributes
|
||||
from heat.engine.clients.os import cinder as cinder_cp
|
||||
from heat.engine.clients.os import nova as nova_cp
|
||||
from heat.engine import constraints
|
||||
from heat.engine import properties
|
||||
from heat.engine import resource
|
||||
from heat.engine import scheduler
|
||||
from heat.engine import volume_tasks as vol_task
|
||||
|
||||
cfg.CONF.import_opt('instance_user', 'heat.common.config')
|
||||
cfg.CONF.import_opt('stack_scheduler_hints', 'heat.common.config')
|
||||
@ -577,23 +577,15 @@ class Instance(resource.Resource):
|
||||
if server is not None:
|
||||
self.resource_id_set(server.id)
|
||||
|
||||
if self.volumes():
|
||||
attacher = scheduler.TaskRunner(self._attach_volumes_task())
|
||||
else:
|
||||
attacher = None
|
||||
creator = nova_cp.ServerCreateProgress(server.id)
|
||||
return creator, attacher
|
||||
|
||||
def _attach_volumes_task(self):
|
||||
attach_tasks = (vol_task.VolumeAttachTask(self.stack,
|
||||
self.resource_id,
|
||||
volume_id,
|
||||
device)
|
||||
for volume_id, device in self.volumes())
|
||||
return scheduler.PollingTaskGroup(attach_tasks)
|
||||
attachers = []
|
||||
for vol_id, device in self.volumes():
|
||||
attachers.append(cinder_cp.VolumeAttachProgress(self.resource_id,
|
||||
vol_id, device))
|
||||
return creator, tuple(attachers)
|
||||
|
||||
def check_create_complete(self, cookie):
|
||||
creator, attacher = cookie
|
||||
creator, attachers = cookie
|
||||
|
||||
if not creator.complete:
|
||||
creator.complete = self.client_plugin()._check_active(
|
||||
@ -601,17 +593,30 @@ class Instance(resource.Resource):
|
||||
if creator.complete:
|
||||
server = self.client_plugin().get_server(creator.server_id)
|
||||
self._set_ipaddress(server.networks)
|
||||
return attacher is None
|
||||
# NOTE(pas-ha) small optimization,
|
||||
# return True if there are no volumes to attach
|
||||
# to save one check_create_complete call
|
||||
return not len(attachers)
|
||||
else:
|
||||
return False
|
||||
return self._check_volume_attached(attacher)
|
||||
return self._attach_volumes(attachers)
|
||||
|
||||
def _check_volume_attached(self, volume_attach_task):
|
||||
if not volume_attach_task.started():
|
||||
volume_attach_task.start()
|
||||
return volume_attach_task.done()
|
||||
else:
|
||||
return volume_attach_task.step()
|
||||
def _attach_volumes(self, attachers):
|
||||
for attacher in attachers:
|
||||
if not attacher.called:
|
||||
self.client_plugin().attach_volume(attacher.srv_id,
|
||||
attacher.vol_id,
|
||||
attacher.device)
|
||||
attacher.called = True
|
||||
return False
|
||||
|
||||
for attacher in attachers:
|
||||
if not attacher.complete:
|
||||
attacher.complete = self.client_plugin(
|
||||
'cinder').check_attach_volume_complete(attacher.vol_id)
|
||||
break
|
||||
out = all(attacher.complete for attacher in attachers)
|
||||
return out
|
||||
|
||||
def volumes(self):
|
||||
"""
|
||||
|
@ -11,14 +11,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import itertools
|
||||
import sys
|
||||
import types
|
||||
|
||||
import eventlet
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import encodeutils
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
from six import reraise as raise_
|
||||
@ -440,100 +437,3 @@ class DependencyTaskGroup(object):
|
||||
"""
|
||||
running = lambda k_r: k_r[0] in self._graph and k_r[1].started()
|
||||
return six.moves.filter(running, six.iteritems(self._runners))
|
||||
|
||||
|
||||
class PollingTaskGroup(object):
|
||||
"""
|
||||
A task which manages a group of subtasks.
|
||||
|
||||
When the task is started, all of its subtasks are also started. The task
|
||||
completes when all subtasks are complete.
|
||||
|
||||
Once started, the subtasks are assumed to be only polling for completion
|
||||
of an asynchronous operation, so no attempt is made to give them equal
|
||||
scheduling slots.
|
||||
"""
|
||||
|
||||
def __init__(self, tasks, name=None):
|
||||
"""Initialise with a list of tasks."""
|
||||
self._tasks = list(tasks)
|
||||
if name is None:
|
||||
name = ', '.join(task_description(t) for t in self._tasks)
|
||||
self.name = name
|
||||
|
||||
@staticmethod
|
||||
def _args(arg_lists):
|
||||
"""Return a list containing the positional args for each subtask."""
|
||||
return zip(*arg_lists)
|
||||
|
||||
@staticmethod
|
||||
def _kwargs(kwarg_lists):
|
||||
"""Return a list containing the keyword args for each subtask."""
|
||||
keygroups = (six.moves.zip(itertools.repeat(name),
|
||||
arglist)
|
||||
for name, arglist in six.iteritems(kwarg_lists))
|
||||
return [dict(kwargs) for kwargs in six.moves.zip(*keygroups)]
|
||||
|
||||
@classmethod
|
||||
def from_task_with_args(cls, task, *arg_lists, **kwarg_lists):
|
||||
"""
|
||||
Return a new PollingTaskGroup where each subtask is identical except
|
||||
for the arguments passed to it.
|
||||
|
||||
Each argument to use should be passed as a list (or iterable) of values
|
||||
such that one is passed in the corresponding position for each subtask.
|
||||
The number of subtasks spawned depends on the length of the argument
|
||||
lists.
|
||||
For example::
|
||||
|
||||
PollingTaskGroup.from_task_with_args(my_task,
|
||||
[1, 2, 3],
|
||||
alpha=['a', 'b', 'c'])
|
||||
|
||||
will start three TaskRunners that will run::
|
||||
|
||||
my_task(1, alpha='a')
|
||||
my_task(2, alpha='b')
|
||||
my_task(3, alpha='c')
|
||||
|
||||
respectively.
|
||||
|
||||
If multiple arguments are supplied, each list should be of the same
|
||||
length. In the case of any discrepancy, the length of the shortest
|
||||
argument list will be used, and any extra arguments discarded.
|
||||
"""
|
||||
|
||||
args_list = cls._args(arg_lists)
|
||||
kwargs_list = cls._kwargs(kwarg_lists)
|
||||
|
||||
if kwarg_lists and not arg_lists:
|
||||
args_list = [[]] * len(kwargs_list)
|
||||
elif arg_lists and not kwarg_lists:
|
||||
kwargs_list = [{}] * len(args_list)
|
||||
|
||||
task_args = six.moves.zip(args_list, kwargs_list)
|
||||
tasks = (functools.partial(task, *a, **kwa) for a, kwa in task_args)
|
||||
|
||||
return cls(tasks, name=task_description(task))
|
||||
|
||||
def __repr__(self):
|
||||
"""Return a string representation of the task group."""
|
||||
text = '%s(%s)' % (type(self).__name__, self.name)
|
||||
return encodeutils.safe_encode(text)
|
||||
|
||||
def __call__(self):
|
||||
"""Return a co-routine which runs the task group."""
|
||||
runners = [TaskRunner(t) for t in self._tasks]
|
||||
|
||||
try:
|
||||
for r in runners:
|
||||
r.start()
|
||||
|
||||
while runners:
|
||||
yield
|
||||
runners = list(itertools.dropwhile(lambda r: r.step(),
|
||||
runners))
|
||||
except: # noqa
|
||||
with excutils.save_and_reraise_exception():
|
||||
for r in runners:
|
||||
r.cancel()
|
||||
|
@ -1,79 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from heat.common.i18n import _
|
||||
from heat.common.i18n import _LI
|
||||
from heat.engine import resource
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VolumeAttachTask(object):
|
||||
"""A task for attaching a volume to a Nova server."""
|
||||
|
||||
def __init__(self, stack, server_id, volume_id, device):
|
||||
"""
|
||||
Initialise with the stack (for obtaining the clients), ID of the
|
||||
server and volume, and the device name on the server.
|
||||
"""
|
||||
self.clients = stack.clients
|
||||
self.server_id = server_id
|
||||
self.volume_id = volume_id
|
||||
self.device = device
|
||||
self.attachment_id = None
|
||||
|
||||
def __str__(self):
|
||||
"""Return a human-readable string description of the task."""
|
||||
return 'Attaching Volume %s to Instance %s as %s' % (self.volume_id,
|
||||
self.server_id,
|
||||
self.device)
|
||||
|
||||
def __repr__(self):
|
||||
"""Return a brief string description of the task."""
|
||||
return '%s(%s -> %s [%s])' % (type(self).__name__,
|
||||
self.volume_id,
|
||||
self.server_id,
|
||||
self.device)
|
||||
|
||||
def __call__(self):
|
||||
"""Return a co-routine which runs the task."""
|
||||
LOG.debug(str(self))
|
||||
|
||||
va = self.clients.client('nova').volumes.create_server_volume(
|
||||
server_id=self.server_id,
|
||||
volume_id=self.volume_id,
|
||||
device=self.device)
|
||||
self.attachment_id = va.id
|
||||
yield
|
||||
|
||||
cinder = self.clients.client('cinder')
|
||||
|
||||
vol = cinder.volumes.get(self.volume_id)
|
||||
while vol.status == 'available' or vol.status == 'attaching':
|
||||
LOG.debug('%(name)s - volume status: %(status)s'
|
||||
% {'name': str(self), 'status': vol.status})
|
||||
yield
|
||||
vol = cinder.volumes.get(self.volume_id)
|
||||
|
||||
if vol.status != 'in-use':
|
||||
LOG.info(_LI("Attachment failed - volume %(vol)s "
|
||||
"is in %(status)s status"),
|
||||
{"vol": vol.id,
|
||||
"status": vol.status})
|
||||
raise resource.ResourceUnknownStatus(
|
||||
resource_status=vol.status,
|
||||
result=_('Volume attachment failed'))
|
||||
|
||||
LOG.info(_LI('%s - complete'), str(self))
|
@ -65,6 +65,15 @@ wp_template = '''
|
||||
"DeviceName": "vdb",
|
||||
"Ebs": {"SnapshotId": "9ef5496e-7426-446a-bbc8-01f84d9c9972",
|
||||
"DeleteOnTermination": "True"}
|
||||
}],
|
||||
"Volumes" : [
|
||||
{
|
||||
"Device": "/dev/vdc",
|
||||
"VolumeId": "cccc"
|
||||
},
|
||||
{
|
||||
"Device": "/dev/vdd",
|
||||
"VolumeId": "dddd"
|
||||
}]
|
||||
}
|
||||
}
|
||||
@ -95,20 +104,24 @@ class InstancesTest(common.HeatTestCase):
|
||||
self.m.StubOutWithMock(glance.GlanceClientPlugin, 'get_image_id')
|
||||
glance.GlanceClientPlugin.get_image_id(image_id).AndRaise(exp)
|
||||
|
||||
def _get_test_template(self, stack_name, image_id=None):
|
||||
def _get_test_template(self, stack_name, image_id=None, volumes=False):
|
||||
(tmpl, stack) = self._setup_test_stack(stack_name)
|
||||
|
||||
tmpl.t['Resources']['WebServer']['Properties'][
|
||||
'ImageId'] = image_id or 'CentOS 5.2'
|
||||
tmpl.t['Resources']['WebServer']['Properties'][
|
||||
'InstanceType'] = '256 MB Server'
|
||||
if not volumes:
|
||||
tmpl.t['Resources']['WebServer']['Properties']['Volumes'] = []
|
||||
|
||||
return tmpl, stack
|
||||
|
||||
def _setup_test_instance(self, return_server, name, image_id=None,
|
||||
stub_create=True, stub_complete=False):
|
||||
stub_create=True, stub_complete=False,
|
||||
volumes=False):
|
||||
stack_name = '%s_s' % name
|
||||
tmpl, self.stack = self._get_test_template(stack_name, image_id)
|
||||
tmpl, self.stack = self._get_test_template(stack_name, image_id,
|
||||
volumes=volumes)
|
||||
resource_defns = tmpl.resource_definitions(self.stack)
|
||||
instance = instances.Instance(name, resource_defns['WebServer'],
|
||||
self.stack)
|
||||
@ -262,6 +275,7 @@ class InstancesTest(common.HeatTestCase):
|
||||
|
||||
self._mock_get_image_id_success('F17-x86_64-gold', 1)
|
||||
self.stub_SnapshotConstraint_validate()
|
||||
self.stub_VolumeConstraint_validate()
|
||||
self.m.StubOutWithMock(nova.NovaClientPlugin, '_create')
|
||||
nova.NovaClientPlugin._create().MultipleTimes().AndReturn(self.fc)
|
||||
|
||||
@ -277,6 +291,7 @@ class InstancesTest(common.HeatTestCase):
|
||||
bdm = [{'DeviceName': 'vdb'}]
|
||||
wsp = tmpl.t['Resources']['WebServer']['Properties']
|
||||
wsp['BlockDeviceMappings'] = bdm
|
||||
wsp['Volumes'] = []
|
||||
resource_defns = tmpl.resource_definitions(stack)
|
||||
instance = instances.Instance('validate_without_Ebs',
|
||||
resource_defns['WebServer'], stack)
|
||||
@ -301,6 +316,7 @@ class InstancesTest(common.HeatTestCase):
|
||||
'Ebs': {'VolumeSize': '1'}}]
|
||||
wsp = tmpl.t['Resources']['WebServer']['Properties']
|
||||
wsp['BlockDeviceMappings'] = bdm
|
||||
wsp['Volumes'] = []
|
||||
resource_defns = tmpl.resource_definitions(stack)
|
||||
instance = instances.Instance('validate_without_SnapshotId',
|
||||
resource_defns['WebServer'], stack)
|
||||
@ -603,6 +619,7 @@ class InstancesTest(common.HeatTestCase):
|
||||
nova.NovaClientPlugin._create().AndReturn(self.fc)
|
||||
|
||||
self._mock_get_image_id_success('1', 1)
|
||||
self.stub_VolumeConstraint_validate()
|
||||
self.stub_SnapshotConstraint_validate()
|
||||
self.m.ReplayAll()
|
||||
|
||||
@ -1049,6 +1066,7 @@ class InstancesTest(common.HeatTestCase):
|
||||
|
||||
scheduler.TaskRunner(instance.create)()
|
||||
self.assertEqual((instance.CREATE, instance.COMPLETE), instance.state)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def _test_instance_status_suspend(self, name,
|
||||
state=('CREATE', 'COMPLETE')):
|
||||
@ -1444,3 +1462,32 @@ class InstancesTest(common.HeatTestCase):
|
||||
self.m.ReplayAll()
|
||||
scheduler.TaskRunner(instance.create)()
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_instance_create_with_volumes(self):
|
||||
return_server = self.fc.servers.list()[1]
|
||||
self.stub_VolumeConstraint_validate()
|
||||
instance = self._setup_test_instance(return_server,
|
||||
'with_volumes',
|
||||
stub_complete=True,
|
||||
volumes=True)
|
||||
attach_mock = self.patchobject(nova.NovaClientPlugin, 'attach_volume',
|
||||
side_effect=['cccc', 'dddd'])
|
||||
check_attach_mock = self.patchobject(cinder.CinderClientPlugin,
|
||||
'check_attach_volume_complete',
|
||||
side_effect=[False, True,
|
||||
False, True])
|
||||
self.m.ReplayAll()
|
||||
|
||||
scheduler.TaskRunner(instance.create)()
|
||||
self.assertEqual((instance.CREATE, instance.COMPLETE), instance.state)
|
||||
self.assertEqual(2, attach_mock.call_count)
|
||||
attach_mock.assert_has_calls([mock.call(instance.resource_id,
|
||||
'cccc', '/dev/vdc'),
|
||||
mock.call(instance.resource_id,
|
||||
'dddd', '/dev/vdd')])
|
||||
self.assertEqual(4, check_attach_mock.call_count)
|
||||
check_attach_mock.assert_has_calls([mock.call('cccc'),
|
||||
mock.call('cccc'),
|
||||
mock.call('dddd'),
|
||||
mock.call('dddd')])
|
||||
self.m.VerifyAll()
|
||||
|
@ -34,134 +34,6 @@ class DummyTask(object):
|
||||
pass
|
||||
|
||||
|
||||
class PollingTaskGroupTest(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(PollingTaskGroupTest, self).setUp()
|
||||
self.addCleanup(self.m.VerifyAll)
|
||||
|
||||
def test_group(self):
|
||||
tasks = [DummyTask() for i in range(3)]
|
||||
for t in tasks:
|
||||
self.m.StubOutWithMock(t, 'do_step')
|
||||
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
scheduler.TaskRunner._sleep(0).AndReturn(None)
|
||||
|
||||
for t in tasks:
|
||||
t.do_step(1).AndReturn(None)
|
||||
for t in tasks:
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
t.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
t.do_step(3).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
tg = scheduler.PollingTaskGroup(tasks)
|
||||
scheduler.TaskRunner(tg)()
|
||||
|
||||
def test_kwargs(self):
|
||||
input_kwargs = {'i': [0, 1, 2],
|
||||
'i2': [0, 1, 4]}
|
||||
|
||||
output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
|
||||
|
||||
expected_kwargs = [{'i': 0, 'i2': 0},
|
||||
{'i': 1, 'i2': 1},
|
||||
{'i': 2, 'i2': 4}]
|
||||
|
||||
self.assertEqual(expected_kwargs, list(output_kwargs))
|
||||
|
||||
def test_kwargs_short(self):
|
||||
input_kwargs = {'i': [0, 1, 2],
|
||||
'i2': [0]}
|
||||
|
||||
output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
|
||||
|
||||
expected_kwargs = [{'i': 0, 'i2': 0}]
|
||||
|
||||
self.assertEqual(expected_kwargs, list(output_kwargs))
|
||||
|
||||
def test_no_kwargs(self):
|
||||
output_kwargs = scheduler.PollingTaskGroup._kwargs({})
|
||||
self.assertEqual([], list(output_kwargs))
|
||||
|
||||
def test_args(self):
|
||||
input_args = ([0, 1, 2],
|
||||
[0, 1, 4])
|
||||
|
||||
output_args = scheduler.PollingTaskGroup._args(input_args)
|
||||
|
||||
expected_args = [(0, 0), (1, 1), (2, 4)]
|
||||
|
||||
self.assertEqual(expected_args, list(output_args))
|
||||
|
||||
def test_args_short(self):
|
||||
input_args = ([0, 1, 2],
|
||||
[0])
|
||||
|
||||
output_args = scheduler.PollingTaskGroup._args(input_args)
|
||||
|
||||
expected_args = [(0, 0)]
|
||||
|
||||
self.assertEqual(expected_args, list(output_args))
|
||||
|
||||
def test_no_args(self):
|
||||
output_args = scheduler.PollingTaskGroup._args([])
|
||||
self.assertEqual([], list(output_args))
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _args_test(self, *arg_lists, **kwarg_lists):
|
||||
dummy = DummyTask(1)
|
||||
|
||||
tg = scheduler.PollingTaskGroup.from_task_with_args(dummy,
|
||||
*arg_lists,
|
||||
**kwarg_lists)
|
||||
|
||||
self.m.StubOutWithMock(dummy, 'do_step')
|
||||
yield dummy
|
||||
|
||||
self.m.ReplayAll()
|
||||
scheduler.TaskRunner(tg)(wait_time=None)
|
||||
|
||||
def test_with_all_args(self):
|
||||
with self._args_test([0, 1, 2], [0, 1, 8],
|
||||
i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
|
||||
for i in range(3):
|
||||
dummy.do_step(1, i, i * i * i, i=i, i2=i * i)
|
||||
|
||||
def test_with_short_args(self):
|
||||
with self._args_test([0, 1, 2], [0, 1],
|
||||
i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
|
||||
for i in range(2):
|
||||
dummy.do_step(1, i, i * i, i=i, i2=i * i)
|
||||
|
||||
def test_with_short_kwargs(self):
|
||||
with self._args_test([0, 1, 2], [0, 1, 8],
|
||||
i=[0, 1], i2=[0, 1, 4]) as dummy:
|
||||
for i in range(2):
|
||||
dummy.do_step(1, i, i * i, i=i, i2=i * i)
|
||||
|
||||
def test_with_empty_args(self):
|
||||
with self._args_test([], i=[0, 1, 2], i2=[0, 1, 4]):
|
||||
pass
|
||||
|
||||
def test_with_empty_kwargs(self):
|
||||
with self._args_test([0, 1, 2], [0, 1, 8], i=[]):
|
||||
pass
|
||||
|
||||
def test_with_no_args(self):
|
||||
with self._args_test(i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
|
||||
for i in range(3):
|
||||
dummy.do_step(1, i=i, i2=i * i)
|
||||
|
||||
def test_with_no_kwargs(self):
|
||||
with self._args_test([0, 1, 2], [0, 1, 4]) as dummy:
|
||||
for i in range(3):
|
||||
dummy.do_step(1, i, i * i)
|
||||
|
||||
|
||||
class ExceptionGroupTest(common.HeatTestCase):
|
||||
|
||||
def test_contains_exceptions(self):
|
||||
|
Loading…
Reference in New Issue
Block a user