diff --git a/sahara/service/direct_engine.py b/sahara/service/direct_engine.py index 96feaa3a..2824a6f6 100644 --- a/sahara/service/direct_engine.py +++ b/sahara/service/direct_engine.py @@ -28,6 +28,7 @@ from sahara.openstack.common import log as logging from sahara.service import engine as e from sahara.service import networks from sahara.service import volumes +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import general as g from sahara.utils.openstack import neutron from sahara.utils.openstack import nova @@ -385,6 +386,10 @@ class DirectEngine(e.Engine): if not instances: return + cpo.add_provisioning_step( + cluster.id, _("Wait for instances to become active"), + len(instances)) + active_ids = set() while len(active_ids) != len(instances): if not g.check_cluster_exists(cluster): @@ -393,6 +398,7 @@ class DirectEngine(e.Engine): if instance.id not in active_ids: if self._check_if_active(instance): active_ids.add(instance.id) + cpo.add_successful_event(instance) context.sleep(1) @@ -402,6 +408,8 @@ class DirectEngine(e.Engine): """Await all instances are deleted.""" if not instances: return + cpo.add_provisioning_step( + cluster.id, _("Wait for instances to be deleted"), len(instances)) deleted_ids = set() while len(deleted_ids) != len(instances): @@ -413,9 +421,11 @@ class DirectEngine(e.Engine): LOG.debug("Instance '%s' is deleted" % instance.instance_name) deleted_ids.add(instance.id) + cpo.add_successful_event(instance) context.sleep(1) + @cpo.event_wrapper(mark_successful_on_exit=False) def _check_if_active(self, instance): server = nova.get_instance_info(instance) if server.status == 'ERROR': @@ -423,6 +433,7 @@ class DirectEngine(e.Engine): return server.status == 'ACTIVE' + @cpo.event_wrapper(mark_successful_on_exit=False) def _check_if_deleted(self, instance): try: nova.get_instance_info(instance) diff --git a/sahara/service/engine.py b/sahara/service/engine.py index f07f6585..f39c1304 100644 --- a/sahara/service/engine.py +++ b/sahara/service/engine.py @@ -22,16 +22,17 @@ import six from sahara import conductor as c from sahara import context +from sahara.i18n import _ from sahara.i18n import _LI from sahara.i18n import _LW from sahara.openstack.common import log as logging from sahara.service import networks +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import edp from sahara.utils import general as g from sahara.utils.openstack import nova from sahara.utils import remote - LOG = logging.getLogger(__name__) conductor = c.API @@ -69,6 +70,8 @@ class Engine(object): if not instances: return + cpo.add_provisioning_step(cluster.id, _("Assign IPs"), len(instances)) + ips_assigned = set() while len(ips_assigned) != len(instances): if not g.check_cluster_exists(cluster): @@ -77,6 +80,7 @@ class Engine(object): if instance.id not in ips_assigned: if networks.init_instances_ips(instance): ips_assigned.add(instance.id) + cpo.add_successful_event(instance) context.sleep(1) @@ -86,6 +90,9 @@ class Engine(object): cluster = conductor.cluster_get(context.ctx(), cluster) instances = g.get_instances(cluster, ips_assigned) + cpo.add_provisioning_step( + cluster.id, _("Wait for instance accessibility"), len(instances)) + with context.ThreadGroup() as tg: for instance in instances: tg.spawn("wait-for-ssh-%s" % instance.instance_name, @@ -93,6 +100,7 @@ class Engine(object): LOG.info(_LI("Cluster '%s': all instances are accessible"), cluster.id) + @cpo.event_wrapper(mark_successful_on_exit=True) def _wait_until_accessible(self, instance): while True: try: @@ -122,6 +130,8 @@ class Engine(object): * etc. """ hosts_file = g.generate_etc_hosts(cluster) + cpo.add_provisioning_step( + cluster.id, _("Configure instances"), g.count_instances(cluster)) with context.ThreadGroup() as tg: for node_group in cluster.node_groups: @@ -129,6 +139,7 @@ class Engine(object): tg.spawn("configure-instance-%s" % instance.instance_name, self._configure_instance, instance, hosts_file) + @cpo.event_wrapper(mark_successful_on_exit=True) def _configure_instance(self, instance, hosts_file): LOG.debug('Configuring instance %s' % instance.instance_name) diff --git a/sahara/service/volumes.py b/sahara/service/volumes.py index b717e795..3c2bc6f8 100644 --- a/sahara/service/volumes.py +++ b/sahara/service/volumes.py @@ -23,6 +23,7 @@ from sahara.i18n import _ from sahara.i18n import _LE from sahara.i18n import _LW from sahara.openstack.common import log as logging +from sahara.utils import cluster_progress_ops as cpo from sahara.utils.openstack import cinder from sahara.utils.openstack import nova @@ -43,7 +44,27 @@ CONF.import_opt('api_version', 'sahara.utils.openstack.cinder', group='cinder') +def _count_instances_to_attach(instances): + result = 0 + for instance in instances: + if instance.node_group.volumes_per_node > 0: + result += 1 + return result + + +def _get_cluster_id(instance): + return instance.node_group.cluster_id + + def attach_to_instances(instances): + instances_to_attach = _count_instances_to_attach(instances) + if instances_to_attach == 0: + return + + cpo.add_provisioning_step( + _get_cluster_id(instances[0]), _("Attach volumes to instances"), + instances_to_attach) + with context.ThreadGroup() as tg: for instance in instances: if instance.node_group.volumes_per_node > 0: @@ -66,6 +87,7 @@ def _await_attach_volumes(instance, devices): instance.instance_name) +@cpo.event_wrapper(mark_successful_on_exit=True) def _attach_volumes_to_node(node_group, instance): ctx = context.ctx() size = node_group.volumes_size diff --git a/sahara/tests/unit/service/test_volumes.py b/sahara/tests/unit/service/test_volumes.py index 8a038c11..455c1cb0 100644 --- a/sahara/tests/unit/service/test_volumes.py +++ b/sahara/tests/unit/service/test_volumes.py @@ -82,14 +82,22 @@ class TestAttachVolume(base.SaharaWithDbTestCase): @mock.patch('sahara.service.volumes._mount_volume') @mock.patch('sahara.service.volumes._await_attach_volumes') @mock.patch('sahara.service.volumes._create_attach_volume') - def test_attach(self, p_create_attach_vol, p_await, p_mount): + @mock.patch('sahara.utils.cluster_progress_ops.add_successful_event') + @mock.patch('sahara.utils.cluster_progress_ops.update_provisioning_steps') + @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') + def test_attach(self, add_step, update_step, add_event, + p_create_attach_vol, p_await, p_mount): p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2 p_await.return_value = None p_mount.return_value = None + add_event.return_value = None + update_step.return_value = None + add_step.return_value = None instance1 = {'id': '1', 'instance_id': '123', 'instance_name': 'inst_1'} + instance2 = {'id': '2', 'instance_id': '456', 'instance_name': 'inst_2'} @@ -100,6 +108,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase): 'volume_mount_prefix': '/mnt/vols', 'volume_type': None, 'name': 'master', + 'cluster_id': '11', 'instances': [instance1, instance2]} cluster = r.ClusterResource({'node_groups': [ng]}) diff --git a/sahara/utils/general.py b/sahara/utils/general.py index 2e5c8392..8ec0cee0 100644 --- a/sahara/utils/general.py +++ b/sahara/utils/general.py @@ -98,6 +98,10 @@ def change_cluster_status(cluster, status, status_description=None): return cluster +def count_instances(cluster): + return sum([node_group.count for node_group in cluster.node_groups]) + + def check_cluster_exists(cluster): ctx = context.ctx() # check if cluster still exists (it might have been removed)