Add provisioning steps to Direct Engine

Change-Id: I4d1c75a9f0961da3d7f0b0c91b4f9ea89353d893
Implements: blueprint event-log
This commit is contained in:
Vitaly Gridnev 2014-12-22 16:42:30 +03:00
parent b1c5f43a82
commit 3c07de686d
5 changed files with 59 additions and 2 deletions

View File

@ -28,6 +28,7 @@ from sahara.openstack.common import log as logging
from sahara.service import engine as e
from sahara.service import networks
from sahara.service import volumes
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g
from sahara.utils.openstack import neutron
from sahara.utils.openstack import nova
@ -385,6 +386,10 @@ class DirectEngine(e.Engine):
if not instances:
return
cpo.add_provisioning_step(
cluster.id, _("Wait for instances to become active"),
len(instances))
active_ids = set()
while len(active_ids) != len(instances):
if not g.check_cluster_exists(cluster):
@ -393,6 +398,7 @@ class DirectEngine(e.Engine):
if instance.id not in active_ids:
if self._check_if_active(instance):
active_ids.add(instance.id)
cpo.add_successful_event(instance)
context.sleep(1)
@ -402,6 +408,8 @@ class DirectEngine(e.Engine):
"""Await all instances are deleted."""
if not instances:
return
cpo.add_provisioning_step(
cluster.id, _("Wait for instances to be deleted"), len(instances))
deleted_ids = set()
while len(deleted_ids) != len(instances):
@ -413,9 +421,11 @@ class DirectEngine(e.Engine):
LOG.debug("Instance '%s' is deleted" %
instance.instance_name)
deleted_ids.add(instance.id)
cpo.add_successful_event(instance)
context.sleep(1)
@cpo.event_wrapper(mark_successful_on_exit=False)
def _check_if_active(self, instance):
server = nova.get_instance_info(instance)
if server.status == 'ERROR':
@ -423,6 +433,7 @@ class DirectEngine(e.Engine):
return server.status == 'ACTIVE'
@cpo.event_wrapper(mark_successful_on_exit=False)
def _check_if_deleted(self, instance):
try:
nova.get_instance_info(instance)

View File

@ -22,16 +22,17 @@ import six
from sahara import conductor as c
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.i18n import _LW
from sahara.openstack.common import log as logging
from sahara.service import networks
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp
from sahara.utils import general as g
from sahara.utils.openstack import nova
from sahara.utils import remote
LOG = logging.getLogger(__name__)
conductor = c.API
@ -69,6 +70,8 @@ class Engine(object):
if not instances:
return
cpo.add_provisioning_step(cluster.id, _("Assign IPs"), len(instances))
ips_assigned = set()
while len(ips_assigned) != len(instances):
if not g.check_cluster_exists(cluster):
@ -77,6 +80,7 @@ class Engine(object):
if instance.id not in ips_assigned:
if networks.init_instances_ips(instance):
ips_assigned.add(instance.id)
cpo.add_successful_event(instance)
context.sleep(1)
@ -86,6 +90,9 @@ class Engine(object):
cluster = conductor.cluster_get(context.ctx(), cluster)
instances = g.get_instances(cluster, ips_assigned)
cpo.add_provisioning_step(
cluster.id, _("Wait for instance accessibility"), len(instances))
with context.ThreadGroup() as tg:
for instance in instances:
tg.spawn("wait-for-ssh-%s" % instance.instance_name,
@ -93,6 +100,7 @@ class Engine(object):
LOG.info(_LI("Cluster '%s': all instances are accessible"), cluster.id)
@cpo.event_wrapper(mark_successful_on_exit=True)
def _wait_until_accessible(self, instance):
while True:
try:
@ -122,6 +130,8 @@ class Engine(object):
* etc.
"""
hosts_file = g.generate_etc_hosts(cluster)
cpo.add_provisioning_step(
cluster.id, _("Configure instances"), g.count_instances(cluster))
with context.ThreadGroup() as tg:
for node_group in cluster.node_groups:
@ -129,6 +139,7 @@ class Engine(object):
tg.spawn("configure-instance-%s" % instance.instance_name,
self._configure_instance, instance, hosts_file)
@cpo.event_wrapper(mark_successful_on_exit=True)
def _configure_instance(self, instance, hosts_file):
LOG.debug('Configuring instance %s' % instance.instance_name)

View File

@ -23,6 +23,7 @@ from sahara.i18n import _
from sahara.i18n import _LE
from sahara.i18n import _LW
from sahara.openstack.common import log as logging
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils.openstack import cinder
from sahara.utils.openstack import nova
@ -43,7 +44,27 @@ CONF.import_opt('api_version', 'sahara.utils.openstack.cinder',
group='cinder')
def _count_instances_to_attach(instances):
result = 0
for instance in instances:
if instance.node_group.volumes_per_node > 0:
result += 1
return result
def _get_cluster_id(instance):
return instance.node_group.cluster_id
def attach_to_instances(instances):
instances_to_attach = _count_instances_to_attach(instances)
if instances_to_attach == 0:
return
cpo.add_provisioning_step(
_get_cluster_id(instances[0]), _("Attach volumes to instances"),
instances_to_attach)
with context.ThreadGroup() as tg:
for instance in instances:
if instance.node_group.volumes_per_node > 0:
@ -66,6 +87,7 @@ def _await_attach_volumes(instance, devices):
instance.instance_name)
@cpo.event_wrapper(mark_successful_on_exit=True)
def _attach_volumes_to_node(node_group, instance):
ctx = context.ctx()
size = node_group.volumes_size

View File

@ -82,14 +82,22 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.volumes._mount_volume')
@mock.patch('sahara.service.volumes._await_attach_volumes')
@mock.patch('sahara.service.volumes._create_attach_volume')
def test_attach(self, p_create_attach_vol, p_await, p_mount):
@mock.patch('sahara.utils.cluster_progress_ops.add_successful_event')
@mock.patch('sahara.utils.cluster_progress_ops.update_provisioning_steps')
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
def test_attach(self, add_step, update_step, add_event,
p_create_attach_vol, p_await, p_mount):
p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2
p_await.return_value = None
p_mount.return_value = None
add_event.return_value = None
update_step.return_value = None
add_step.return_value = None
instance1 = {'id': '1',
'instance_id': '123',
'instance_name': 'inst_1'}
instance2 = {'id': '2',
'instance_id': '456',
'instance_name': 'inst_2'}
@ -100,6 +108,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
'volume_mount_prefix': '/mnt/vols',
'volume_type': None,
'name': 'master',
'cluster_id': '11',
'instances': [instance1, instance2]}
cluster = r.ClusterResource({'node_groups': [ng]})

View File

@ -98,6 +98,10 @@ def change_cluster_status(cluster, status, status_description=None):
return cluster
def count_instances(cluster):
return sum([node_group.count for node_group in cluster.node_groups])
def check_cluster_exists(cluster):
ctx = context.ctx()
# check if cluster still exists (it might have been removed)