diff --git a/heat/engine/resources/aws/ec2/instance.py b/heat/engine/resources/aws/ec2/instance.py index 3b573afc4..27d8a330a 100644 --- a/heat/engine/resources/aws/ec2/instance.py +++ b/heat/engine/resources/aws/ec2/instance.py @@ -21,12 +21,12 @@ from heat.common import exception from heat.common.i18n import _ from heat.common.i18n import _LI from heat.engine import attributes +from heat.engine.clients.os import cinder as cinder_cp from heat.engine.clients.os import nova as nova_cp from heat.engine import constraints from heat.engine import properties from heat.engine import resource from heat.engine import scheduler -from heat.engine import volume_tasks as vol_task cfg.CONF.import_opt('instance_user', 'heat.common.config') cfg.CONF.import_opt('stack_scheduler_hints', 'heat.common.config') @@ -577,23 +577,15 @@ class Instance(resource.Resource): if server is not None: self.resource_id_set(server.id) - if self.volumes(): - attacher = scheduler.TaskRunner(self._attach_volumes_task()) - else: - attacher = None creator = nova_cp.ServerCreateProgress(server.id) - return creator, attacher - - def _attach_volumes_task(self): - attach_tasks = (vol_task.VolumeAttachTask(self.stack, - self.resource_id, - volume_id, - device) - for volume_id, device in self.volumes()) - return scheduler.PollingTaskGroup(attach_tasks) + attachers = [] + for vol_id, device in self.volumes(): + attachers.append(cinder_cp.VolumeAttachProgress(self.resource_id, + vol_id, device)) + return creator, tuple(attachers) def check_create_complete(self, cookie): - creator, attacher = cookie + creator, attachers = cookie if not creator.complete: creator.complete = self.client_plugin()._check_active( @@ -601,17 +593,30 @@ class Instance(resource.Resource): if creator.complete: server = self.client_plugin().get_server(creator.server_id) self._set_ipaddress(server.networks) - return attacher is None + # NOTE(pas-ha) small optimization, + # return True if there are no volumes to attach + # to save one check_create_complete call + return not len(attachers) else: return False - return self._check_volume_attached(attacher) + return self._attach_volumes(attachers) - def _check_volume_attached(self, volume_attach_task): - if not volume_attach_task.started(): - volume_attach_task.start() - return volume_attach_task.done() - else: - return volume_attach_task.step() + def _attach_volumes(self, attachers): + for attacher in attachers: + if not attacher.called: + self.client_plugin().attach_volume(attacher.srv_id, + attacher.vol_id, + attacher.device) + attacher.called = True + return False + + for attacher in attachers: + if not attacher.complete: + attacher.complete = self.client_plugin( + 'cinder').check_attach_volume_complete(attacher.vol_id) + break + out = all(attacher.complete for attacher in attachers) + return out def volumes(self): """ diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py index af046f583..edc34e582 100644 --- a/heat/engine/scheduler.py +++ b/heat/engine/scheduler.py @@ -11,14 +11,11 @@ # License for the specific language governing permissions and limitations # under the License. -import functools -import itertools import sys import types import eventlet from oslo_log import log as logging -from oslo_utils import encodeutils from oslo_utils import excutils import six from six import reraise as raise_ @@ -440,100 +437,3 @@ class DependencyTaskGroup(object): """ running = lambda k_r: k_r[0] in self._graph and k_r[1].started() return six.moves.filter(running, six.iteritems(self._runners)) - - -class PollingTaskGroup(object): - """ - A task which manages a group of subtasks. - - When the task is started, all of its subtasks are also started. The task - completes when all subtasks are complete. - - Once started, the subtasks are assumed to be only polling for completion - of an asynchronous operation, so no attempt is made to give them equal - scheduling slots. - """ - - def __init__(self, tasks, name=None): - """Initialise with a list of tasks.""" - self._tasks = list(tasks) - if name is None: - name = ', '.join(task_description(t) for t in self._tasks) - self.name = name - - @staticmethod - def _args(arg_lists): - """Return a list containing the positional args for each subtask.""" - return zip(*arg_lists) - - @staticmethod - def _kwargs(kwarg_lists): - """Return a list containing the keyword args for each subtask.""" - keygroups = (six.moves.zip(itertools.repeat(name), - arglist) - for name, arglist in six.iteritems(kwarg_lists)) - return [dict(kwargs) for kwargs in six.moves.zip(*keygroups)] - - @classmethod - def from_task_with_args(cls, task, *arg_lists, **kwarg_lists): - """ - Return a new PollingTaskGroup where each subtask is identical except - for the arguments passed to it. - - Each argument to use should be passed as a list (or iterable) of values - such that one is passed in the corresponding position for each subtask. - The number of subtasks spawned depends on the length of the argument - lists. - For example:: - - PollingTaskGroup.from_task_with_args(my_task, - [1, 2, 3], - alpha=['a', 'b', 'c']) - - will start three TaskRunners that will run:: - - my_task(1, alpha='a') - my_task(2, alpha='b') - my_task(3, alpha='c') - - respectively. - - If multiple arguments are supplied, each list should be of the same - length. In the case of any discrepancy, the length of the shortest - argument list will be used, and any extra arguments discarded. - """ - - args_list = cls._args(arg_lists) - kwargs_list = cls._kwargs(kwarg_lists) - - if kwarg_lists and not arg_lists: - args_list = [[]] * len(kwargs_list) - elif arg_lists and not kwarg_lists: - kwargs_list = [{}] * len(args_list) - - task_args = six.moves.zip(args_list, kwargs_list) - tasks = (functools.partial(task, *a, **kwa) for a, kwa in task_args) - - return cls(tasks, name=task_description(task)) - - def __repr__(self): - """Return a string representation of the task group.""" - text = '%s(%s)' % (type(self).__name__, self.name) - return encodeutils.safe_encode(text) - - def __call__(self): - """Return a co-routine which runs the task group.""" - runners = [TaskRunner(t) for t in self._tasks] - - try: - for r in runners: - r.start() - - while runners: - yield - runners = list(itertools.dropwhile(lambda r: r.step(), - runners)) - except: # noqa - with excutils.save_and_reraise_exception(): - for r in runners: - r.cancel() diff --git a/heat/engine/volume_tasks.py b/heat/engine/volume_tasks.py deleted file mode 100644 index afe709ce8..000000000 --- a/heat/engine/volume_tasks.py +++ /dev/null @@ -1,79 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log as logging - -from heat.common.i18n import _ -from heat.common.i18n import _LI -from heat.engine import resource - -LOG = logging.getLogger(__name__) - - -class VolumeAttachTask(object): - """A task for attaching a volume to a Nova server.""" - - def __init__(self, stack, server_id, volume_id, device): - """ - Initialise with the stack (for obtaining the clients), ID of the - server and volume, and the device name on the server. - """ - self.clients = stack.clients - self.server_id = server_id - self.volume_id = volume_id - self.device = device - self.attachment_id = None - - def __str__(self): - """Return a human-readable string description of the task.""" - return 'Attaching Volume %s to Instance %s as %s' % (self.volume_id, - self.server_id, - self.device) - - def __repr__(self): - """Return a brief string description of the task.""" - return '%s(%s -> %s [%s])' % (type(self).__name__, - self.volume_id, - self.server_id, - self.device) - - def __call__(self): - """Return a co-routine which runs the task.""" - LOG.debug(str(self)) - - va = self.clients.client('nova').volumes.create_server_volume( - server_id=self.server_id, - volume_id=self.volume_id, - device=self.device) - self.attachment_id = va.id - yield - - cinder = self.clients.client('cinder') - - vol = cinder.volumes.get(self.volume_id) - while vol.status == 'available' or vol.status == 'attaching': - LOG.debug('%(name)s - volume status: %(status)s' - % {'name': str(self), 'status': vol.status}) - yield - vol = cinder.volumes.get(self.volume_id) - - if vol.status != 'in-use': - LOG.info(_LI("Attachment failed - volume %(vol)s " - "is in %(status)s status"), - {"vol": vol.id, - "status": vol.status}) - raise resource.ResourceUnknownStatus( - resource_status=vol.status, - result=_('Volume attachment failed')) - - LOG.info(_LI('%s - complete'), str(self)) diff --git a/heat/tests/aws/test_instance.py b/heat/tests/aws/test_instance.py index e3b141aec..eefb41c99 100644 --- a/heat/tests/aws/test_instance.py +++ b/heat/tests/aws/test_instance.py @@ -65,6 +65,15 @@ wp_template = ''' "DeviceName": "vdb", "Ebs": {"SnapshotId": "9ef5496e-7426-446a-bbc8-01f84d9c9972", "DeleteOnTermination": "True"} + }], + "Volumes" : [ + { + "Device": "/dev/vdc", + "VolumeId": "cccc" + }, + { + "Device": "/dev/vdd", + "VolumeId": "dddd" }] } } @@ -95,20 +104,24 @@ class InstancesTest(common.HeatTestCase): self.m.StubOutWithMock(glance.GlanceClientPlugin, 'get_image_id') glance.GlanceClientPlugin.get_image_id(image_id).AndRaise(exp) - def _get_test_template(self, stack_name, image_id=None): + def _get_test_template(self, stack_name, image_id=None, volumes=False): (tmpl, stack) = self._setup_test_stack(stack_name) tmpl.t['Resources']['WebServer']['Properties'][ 'ImageId'] = image_id or 'CentOS 5.2' tmpl.t['Resources']['WebServer']['Properties'][ 'InstanceType'] = '256 MB Server' + if not volumes: + tmpl.t['Resources']['WebServer']['Properties']['Volumes'] = [] return tmpl, stack def _setup_test_instance(self, return_server, name, image_id=None, - stub_create=True, stub_complete=False): + stub_create=True, stub_complete=False, + volumes=False): stack_name = '%s_s' % name - tmpl, self.stack = self._get_test_template(stack_name, image_id) + tmpl, self.stack = self._get_test_template(stack_name, image_id, + volumes=volumes) resource_defns = tmpl.resource_definitions(self.stack) instance = instances.Instance(name, resource_defns['WebServer'], self.stack) @@ -262,6 +275,7 @@ class InstancesTest(common.HeatTestCase): self._mock_get_image_id_success('F17-x86_64-gold', 1) self.stub_SnapshotConstraint_validate() + self.stub_VolumeConstraint_validate() self.m.StubOutWithMock(nova.NovaClientPlugin, '_create') nova.NovaClientPlugin._create().MultipleTimes().AndReturn(self.fc) @@ -277,6 +291,7 @@ class InstancesTest(common.HeatTestCase): bdm = [{'DeviceName': 'vdb'}] wsp = tmpl.t['Resources']['WebServer']['Properties'] wsp['BlockDeviceMappings'] = bdm + wsp['Volumes'] = [] resource_defns = tmpl.resource_definitions(stack) instance = instances.Instance('validate_without_Ebs', resource_defns['WebServer'], stack) @@ -301,6 +316,7 @@ class InstancesTest(common.HeatTestCase): 'Ebs': {'VolumeSize': '1'}}] wsp = tmpl.t['Resources']['WebServer']['Properties'] wsp['BlockDeviceMappings'] = bdm + wsp['Volumes'] = [] resource_defns = tmpl.resource_definitions(stack) instance = instances.Instance('validate_without_SnapshotId', resource_defns['WebServer'], stack) @@ -603,6 +619,7 @@ class InstancesTest(common.HeatTestCase): nova.NovaClientPlugin._create().AndReturn(self.fc) self._mock_get_image_id_success('1', 1) + self.stub_VolumeConstraint_validate() self.stub_SnapshotConstraint_validate() self.m.ReplayAll() @@ -1049,6 +1066,7 @@ class InstancesTest(common.HeatTestCase): scheduler.TaskRunner(instance.create)() self.assertEqual((instance.CREATE, instance.COMPLETE), instance.state) + self.m.VerifyAll() def _test_instance_status_suspend(self, name, state=('CREATE', 'COMPLETE')): @@ -1444,3 +1462,32 @@ class InstancesTest(common.HeatTestCase): self.m.ReplayAll() scheduler.TaskRunner(instance.create)() self.m.VerifyAll() + + def test_instance_create_with_volumes(self): + return_server = self.fc.servers.list()[1] + self.stub_VolumeConstraint_validate() + instance = self._setup_test_instance(return_server, + 'with_volumes', + stub_complete=True, + volumes=True) + attach_mock = self.patchobject(nova.NovaClientPlugin, 'attach_volume', + side_effect=['cccc', 'dddd']) + check_attach_mock = self.patchobject(cinder.CinderClientPlugin, + 'check_attach_volume_complete', + side_effect=[False, True, + False, True]) + self.m.ReplayAll() + + scheduler.TaskRunner(instance.create)() + self.assertEqual((instance.CREATE, instance.COMPLETE), instance.state) + self.assertEqual(2, attach_mock.call_count) + attach_mock.assert_has_calls([mock.call(instance.resource_id, + 'cccc', '/dev/vdc'), + mock.call(instance.resource_id, + 'dddd', '/dev/vdd')]) + self.assertEqual(4, check_attach_mock.call_count) + check_attach_mock.assert_has_calls([mock.call('cccc'), + mock.call('cccc'), + mock.call('dddd'), + mock.call('dddd')]) + self.m.VerifyAll() diff --git a/heat/tests/engine/test_scheduler.py b/heat/tests/engine/test_scheduler.py index 9a87032ab..64ef10862 100644 --- a/heat/tests/engine/test_scheduler.py +++ b/heat/tests/engine/test_scheduler.py @@ -34,134 +34,6 @@ class DummyTask(object): pass -class PollingTaskGroupTest(common.HeatTestCase): - - def setUp(self): - super(PollingTaskGroupTest, self).setUp() - self.addCleanup(self.m.VerifyAll) - - def test_group(self): - tasks = [DummyTask() for i in range(3)] - for t in tasks: - self.m.StubOutWithMock(t, 'do_step') - - self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') - scheduler.TaskRunner._sleep(0).AndReturn(None) - - for t in tasks: - t.do_step(1).AndReturn(None) - for t in tasks: - scheduler.TaskRunner._sleep(1).AndReturn(None) - t.do_step(2).AndReturn(None) - scheduler.TaskRunner._sleep(1).AndReturn(None) - t.do_step(3).AndReturn(None) - - self.m.ReplayAll() - - tg = scheduler.PollingTaskGroup(tasks) - scheduler.TaskRunner(tg)() - - def test_kwargs(self): - input_kwargs = {'i': [0, 1, 2], - 'i2': [0, 1, 4]} - - output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs) - - expected_kwargs = [{'i': 0, 'i2': 0}, - {'i': 1, 'i2': 1}, - {'i': 2, 'i2': 4}] - - self.assertEqual(expected_kwargs, list(output_kwargs)) - - def test_kwargs_short(self): - input_kwargs = {'i': [0, 1, 2], - 'i2': [0]} - - output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs) - - expected_kwargs = [{'i': 0, 'i2': 0}] - - self.assertEqual(expected_kwargs, list(output_kwargs)) - - def test_no_kwargs(self): - output_kwargs = scheduler.PollingTaskGroup._kwargs({}) - self.assertEqual([], list(output_kwargs)) - - def test_args(self): - input_args = ([0, 1, 2], - [0, 1, 4]) - - output_args = scheduler.PollingTaskGroup._args(input_args) - - expected_args = [(0, 0), (1, 1), (2, 4)] - - self.assertEqual(expected_args, list(output_args)) - - def test_args_short(self): - input_args = ([0, 1, 2], - [0]) - - output_args = scheduler.PollingTaskGroup._args(input_args) - - expected_args = [(0, 0)] - - self.assertEqual(expected_args, list(output_args)) - - def test_no_args(self): - output_args = scheduler.PollingTaskGroup._args([]) - self.assertEqual([], list(output_args)) - - @contextlib.contextmanager - def _args_test(self, *arg_lists, **kwarg_lists): - dummy = DummyTask(1) - - tg = scheduler.PollingTaskGroup.from_task_with_args(dummy, - *arg_lists, - **kwarg_lists) - - self.m.StubOutWithMock(dummy, 'do_step') - yield dummy - - self.m.ReplayAll() - scheduler.TaskRunner(tg)(wait_time=None) - - def test_with_all_args(self): - with self._args_test([0, 1, 2], [0, 1, 8], - i=[0, 1, 2], i2=[0, 1, 4]) as dummy: - for i in range(3): - dummy.do_step(1, i, i * i * i, i=i, i2=i * i) - - def test_with_short_args(self): - with self._args_test([0, 1, 2], [0, 1], - i=[0, 1, 2], i2=[0, 1, 4]) as dummy: - for i in range(2): - dummy.do_step(1, i, i * i, i=i, i2=i * i) - - def test_with_short_kwargs(self): - with self._args_test([0, 1, 2], [0, 1, 8], - i=[0, 1], i2=[0, 1, 4]) as dummy: - for i in range(2): - dummy.do_step(1, i, i * i, i=i, i2=i * i) - - def test_with_empty_args(self): - with self._args_test([], i=[0, 1, 2], i2=[0, 1, 4]): - pass - - def test_with_empty_kwargs(self): - with self._args_test([0, 1, 2], [0, 1, 8], i=[]): - pass - - def test_with_no_args(self): - with self._args_test(i=[0, 1, 2], i2=[0, 1, 4]) as dummy: - for i in range(3): - dummy.do_step(1, i=i, i2=i * i) - - def test_with_no_kwargs(self): - with self._args_test([0, 1, 2], [0, 1, 4]) as dummy: - for i in range(3): - dummy.do_step(1, i, i * i) - - class ExceptionGroupTest(common.HeatTestCase): def test_contains_exceptions(self):