Merge "Move updating provision progress to conductor"
This commit is contained in:
commit
9f709c912a
@ -442,31 +442,24 @@ class LocalApi(object):
|
||||
# Events ops
|
||||
|
||||
def cluster_provision_step_add(self, context, cluster_id, values):
|
||||
"""Create a cluster assigned ProvisionStep
|
||||
|
||||
from the values dictionary
|
||||
"""
|
||||
"""Create a provisioning step assigned to cluster from values dict."""
|
||||
return self._manager.cluster_provision_step_add(
|
||||
context, cluster_id, values)
|
||||
|
||||
def cluster_provision_step_update(self, context, provision_step, values):
|
||||
"""Update the ProvisionStep from the values dictionary."""
|
||||
self._manager.cluster_provision_step_update(
|
||||
context, provision_step, values)
|
||||
|
||||
def cluster_provision_step_get_events(self, context, provision_step):
|
||||
"""Return all events from the specified ProvisionStep."""
|
||||
return self._manager.cluster_provision_step_get_events(
|
||||
def cluster_provision_step_update(self, context, provision_step):
|
||||
"""Update the cluster provisioning step."""
|
||||
return self._manager.cluster_provision_step_update(
|
||||
context, provision_step)
|
||||
|
||||
def cluster_provision_step_remove_events(self, context, provision_step):
|
||||
"""Delete all event from the specified ProvisionStep."""
|
||||
self._manager.cluster_provision_step_remove_events(
|
||||
context, provision_step)
|
||||
def cluster_provision_progress_update(self, context, cluster_id):
|
||||
"""Return cluster with provision progress updated field."""
|
||||
return self._manager.cluster_provision_progress_update(
|
||||
context, cluster_id)
|
||||
|
||||
def cluster_event_add(self, context, provision_step, values):
|
||||
"""Assign new event to the specified ProvisionStep."""
|
||||
self._manager.cluster_event_add(context, provision_step, values)
|
||||
"""Assign new event to the specified provision step."""
|
||||
return self._manager.cluster_event_add(
|
||||
context, provision_step, values)
|
||||
|
||||
|
||||
class RemoteApi(LocalApi):
|
||||
|
@ -473,25 +473,17 @@ class ConductorManager(db_base.Base):
|
||||
# Events ops
|
||||
|
||||
def cluster_provision_step_add(self, context, cluster_id, values):
|
||||
"""Create a cluster assigned ProvisionStep
|
||||
|
||||
from the values dictionary
|
||||
"""
|
||||
"""Create a provisioning step assigned to cluster from values dict."""
|
||||
return self.db.cluster_provision_step_add(context, cluster_id, values)
|
||||
|
||||
def cluster_provision_step_update(self, context, provision_step, values):
|
||||
"""Update the ProvisionStep from the values dictionary."""
|
||||
self.db.cluster_provision_step_update(context, provision_step, values)
|
||||
def cluster_provision_step_update(self, context, provision_step):
|
||||
"""Update the cluster provisioning step."""
|
||||
return self.db.cluster_provision_step_update(context, provision_step)
|
||||
|
||||
def cluster_provision_step_get_events(self, context, provision_step):
|
||||
"""Return all events from the specified ProvisionStep."""
|
||||
return self.db.cluster_provision_step_get_events(
|
||||
context, provision_step)
|
||||
|
||||
def cluster_provision_step_remove_events(self, context, provision_step):
|
||||
"""Delete all event from the specified ProvisionStep."""
|
||||
self.db.cluster_provision_step_remove_events(context, provision_step)
|
||||
def cluster_provision_progress_update(self, context, cluster_id):
|
||||
"""Return cluster with provision progress updated field."""
|
||||
return self.db.cluster_provision_progress_update(context, cluster_id)
|
||||
|
||||
def cluster_event_add(self, context, provision_step, values):
|
||||
"""Assign new event to the specified ProvisionStep."""
|
||||
self.db.cluster_event_add(context, provision_step, values)
|
||||
"""Assign new event to the specified provision step."""
|
||||
return self.db.cluster_event_add(context, provision_step, values)
|
||||
|
@ -293,11 +293,8 @@ class ClusterProvisionStep(object):
|
||||
tenant_id
|
||||
step_name
|
||||
step_type
|
||||
completed
|
||||
total
|
||||
successful
|
||||
started_at
|
||||
completed_at
|
||||
events - list of Events objects assigned to the cluster
|
||||
"""
|
||||
|
||||
|
@ -274,12 +274,13 @@ def sleep(seconds=0):
|
||||
|
||||
class InstanceInfo(object):
|
||||
def __init__(self, cluster_id=None, instance_id=None, instance_name=None,
|
||||
node_group_id=None, step_type=None):
|
||||
node_group_id=None, step_type=None, step_id=None):
|
||||
self.cluster_id = cluster_id
|
||||
self.instance_id = instance_id
|
||||
self.instance_name = instance_name
|
||||
self.node_group_id = node_group_id
|
||||
self.step_type = step_type
|
||||
self.step_id = step_id
|
||||
|
||||
|
||||
def set_step_type(step_type):
|
||||
@ -291,6 +292,8 @@ class InstanceInfoManager(object):
|
||||
self.prev_instance_info = current().current_instance_info
|
||||
if not instance_info.step_type:
|
||||
instance_info.step_type = self.prev_instance_info.step_type
|
||||
if not instance_info.step_id:
|
||||
instance_info.step_id = self.prev_instance_info.step_id
|
||||
current().current_instance_info = instance_info
|
||||
|
||||
def __enter__(self):
|
||||
|
@ -110,6 +110,9 @@ def to_dict(func):
|
||||
|
||||
def cluster_get(context, cluster, show_progress=False):
|
||||
"""Return the cluster or None if it does not exist."""
|
||||
if show_progress:
|
||||
cluster = IMPL.cluster_provision_progress_update(context, cluster)
|
||||
else:
|
||||
cluster = IMPL.cluster_get(context, cluster)
|
||||
if cluster:
|
||||
return cluster.to_dict(show_progress)
|
||||
@ -446,27 +449,24 @@ def job_binary_internal_get_raw_data(context, job_binary_internal_id):
|
||||
return IMPL.job_binary_internal_get_raw_data(context,
|
||||
job_binary_internal_id)
|
||||
|
||||
# Events ops
|
||||
|
||||
|
||||
def cluster_provision_step_add(context, cluster_id, values):
|
||||
"""Create a cluster assigned ProvisionStep from the values dictionary."""
|
||||
return IMPL.cluster_provision_step_add(context, cluster_id, values)
|
||||
|
||||
|
||||
def cluster_provision_step_update(context, provision_step, values):
|
||||
"""Update the ProvisionStep from the values dictionary."""
|
||||
IMPL.cluster_provision_step_update(context, provision_step, values)
|
||||
def cluster_provision_step_update(context, step_id):
|
||||
"""Updates provision step."""
|
||||
return IMPL.cluster_provision_step_update(context, step_id)
|
||||
|
||||
|
||||
def cluster_provision_step_get_events(context, provision_step):
|
||||
"""Return all events from the specified ProvisionStep."""
|
||||
return IMPL.cluster_provision_step_get_events(context, provision_step)
|
||||
|
||||
|
||||
def cluster_provision_step_remove_events(context, provision_step):
|
||||
"""Delete all event from the specified ProvisionStep."""
|
||||
IMPL.cluster_provision_step_remove_events(context, provision_step)
|
||||
def cluster_provision_progress_update(context, cluster_id):
|
||||
"""Return cluster with provision progress updated field."""
|
||||
return IMPL.cluster_provision_progress_update(context, cluster_id)
|
||||
|
||||
|
||||
def cluster_event_add(context, provision_step, values):
|
||||
"""Assign new event to the specified ProvisionStep."""
|
||||
IMPL.cluster_event_add(context, provision_step, values)
|
||||
"""Assign new event to the specified provision step."""
|
||||
return IMPL.cluster_event_add(context, provision_step, values)
|
||||
|
@ -0,0 +1,44 @@
|
||||
# Copyright 2015 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""remove redandunt progress ops
|
||||
|
||||
Revision ID: 020
|
||||
Revises: 019
|
||||
Create Date: 2015-02-26 15:01:41.015076
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '020'
|
||||
down_revision = '019'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.drop_column('cluster_provision_steps', 'completed_at')
|
||||
op.drop_column('cluster_provision_steps', 'completed')
|
||||
op.drop_column('cluster_provision_steps', 'started_at')
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.add_column('cluster_provision_steps',
|
||||
sa.Column('completed', sa.Integer(), nullable=True))
|
||||
op.add_column('cluster_provision_steps',
|
||||
sa.Column('started_at', sa.DateTime(), nullable=True))
|
||||
op.add_column('cluster_provision_steps',
|
||||
sa.Column('completed_at', sa.DateTime(), nullable=True))
|
@ -1114,6 +1114,22 @@ def _cluster_provision_step_get(context, session, provision_step_id):
|
||||
return query.filter_by(id=provision_step_id).first()
|
||||
|
||||
|
||||
def _cluster_provision_step_update(context, session, step_id):
|
||||
step = _cluster_provision_step_get(context, session, step_id)
|
||||
|
||||
if step is None:
|
||||
raise ex.NotFoundException(
|
||||
step_id,
|
||||
_("Cluster Provision Step id '%s' not found!"))
|
||||
|
||||
if step.successful is not None:
|
||||
return
|
||||
if len(step.events) == step.total:
|
||||
for event in step.events:
|
||||
session.delete(event)
|
||||
step.update({'successful': True})
|
||||
|
||||
|
||||
def cluster_provision_step_add(context, cluster_id, values):
|
||||
session = get_session()
|
||||
|
||||
@ -1132,64 +1148,29 @@ def cluster_provision_step_add(context, cluster_id, values):
|
||||
return provision_step.id
|
||||
|
||||
|
||||
def cluster_provision_step_update(context, provision_step_id, values):
|
||||
session = get_session()
|
||||
|
||||
with session.begin():
|
||||
provision_step = _cluster_provision_step_get(
|
||||
context, session, provision_step_id)
|
||||
|
||||
if not provision_step:
|
||||
raise ex.NotFoundException(
|
||||
provision_step_id,
|
||||
_("Cluster Provision Step id '%s' not found!"))
|
||||
|
||||
provision_step.update(values)
|
||||
|
||||
|
||||
def cluster_provision_step_get_events(context, provision_step_id):
|
||||
def cluster_provision_step_update(context, step_id):
|
||||
if CONF.disable_event_log:
|
||||
return
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
provision_step = _cluster_provision_step_get(
|
||||
context, session, provision_step_id)
|
||||
|
||||
if not provision_step:
|
||||
raise ex.NotFoundException(
|
||||
provision_step_id,
|
||||
_("Cluster Provision Step id '%s' not found!"))
|
||||
|
||||
return provision_step.events
|
||||
_cluster_provision_step_update(context, session, step_id)
|
||||
|
||||
|
||||
def cluster_provision_step_remove_events(context, provision_step_id):
|
||||
def cluster_provision_progress_update(context, cluster_id):
|
||||
if CONF.disable_event_log:
|
||||
return _cluster_get(context, get_session(), cluster_id)
|
||||
session = get_session()
|
||||
|
||||
with session.begin():
|
||||
provision_step = _cluster_provision_step_get(
|
||||
context, session, provision_step_id)
|
||||
cluster = _cluster_get(context, session, cluster_id)
|
||||
|
||||
if not provision_step:
|
||||
raise ex.NotFoundException(
|
||||
provision_step_id,
|
||||
_("Cluster Provision Step id '%s' not found!"))
|
||||
|
||||
for event in provision_step.events:
|
||||
session.delete(event)
|
||||
|
||||
|
||||
def cluster_provision_step_remove(context, provision_step_id):
|
||||
session = get_session()
|
||||
cluster_provision_step_remove_events(context, provision_step_id)
|
||||
with session.begin():
|
||||
provision_step = _cluster_provision_step_get(
|
||||
context, session, provision_step_id)
|
||||
|
||||
if not provision_step:
|
||||
raise ex.NotFoundException(
|
||||
provision_step_id,
|
||||
_("Cluster Provision Step id '%s' not found!"))
|
||||
|
||||
session.delete(provision_step)
|
||||
if cluster is None:
|
||||
raise ex.NotFoundException(cluster_id,
|
||||
_("Cluster id '%s' not found!"))
|
||||
for step in cluster.provision_progress:
|
||||
if step.successful is None:
|
||||
_cluster_provision_step_update(context, session, step.id)
|
||||
result_cluster = _cluster_get(context, session, cluster_id)
|
||||
return result_cluster
|
||||
|
||||
|
||||
def cluster_event_add(context, step_id, values):
|
||||
@ -1206,6 +1187,8 @@ def cluster_event_add(context, step_id, values):
|
||||
|
||||
event = m.ClusterEvent()
|
||||
values['step_id'] = step_id
|
||||
if not values['successful']:
|
||||
provision_step.update({'successful': False})
|
||||
event.update(values)
|
||||
session.add(event)
|
||||
|
||||
|
@ -416,11 +416,8 @@ class ClusterProvisionStep(mb.SaharaBase):
|
||||
tenant_id = sa.Column(sa.String(36))
|
||||
step_name = sa.Column(sa.String(80))
|
||||
step_type = sa.Column(sa.String(36))
|
||||
completed = sa.Column(sa.Integer)
|
||||
total = sa.Column(sa.Integer)
|
||||
successful = sa.Column(sa.Boolean, nullable=True)
|
||||
started_at = sa.Column(sa.DateTime())
|
||||
completed_at = sa.Column(sa.DateTime())
|
||||
events = relationship('ClusterEvent', cascade="all,delete",
|
||||
backref='ClusterProvisionStep',
|
||||
lazy='joined')
|
||||
|
@ -147,6 +147,8 @@ def _make_periodic_tasks():
|
||||
continue
|
||||
|
||||
terminate_cluster(ctx, cluster, description='transient')
|
||||
# Add event log info cleanup
|
||||
context.ctx().current_instance_info = context.InstanceInfo()
|
||||
context.set_ctx(None)
|
||||
|
||||
@periodic_task.periodic_task(spacing=zombie_task_spacing)
|
||||
@ -185,7 +187,8 @@ def _make_periodic_tasks():
|
||||
continue
|
||||
|
||||
terminate_cluster(ctx, cluster, description='incomplete')
|
||||
|
||||
# Add event log info cleanup
|
||||
context.ctx().current_instance_info = context.InstanceInfo()
|
||||
context.set_ctx(None)
|
||||
|
||||
return SaharaPeriodicTasks()
|
||||
|
@ -13,8 +13,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import testtools
|
||||
|
||||
from sahara import conductor
|
||||
from sahara import context
|
||||
from sahara import exceptions
|
||||
from sahara.tests.unit import base
|
||||
from sahara.utils import general as gu
|
||||
|
||||
@ -116,6 +120,19 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
|
||||
ng = gu.get_by_id(cluster.node_groups, ng_id)
|
||||
self.assertEqual(ng.instances[0].instance_name, 'tst123')
|
||||
|
||||
def _get_events(self, ctx, cluster_id, step_id=None):
|
||||
cluster = self.api.cluster_get(ctx, cluster_id, show_progress=True)
|
||||
events = []
|
||||
for step in cluster.provision_progress:
|
||||
if step_id == step['id']:
|
||||
return step['events']
|
||||
else:
|
||||
events += step['events']
|
||||
if step_id:
|
||||
return events
|
||||
else:
|
||||
return []
|
||||
|
||||
def test_events_ops(self):
|
||||
ctx, cluster = self._make_sample()
|
||||
|
||||
@ -138,20 +155,6 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
|
||||
self.assertEqual(st_type, provision_step['step_type'])
|
||||
self.assertEqual(cluster.id, provision_step['cluster_id'])
|
||||
|
||||
# test provision step updating
|
||||
|
||||
self.api.cluster_provision_step_update(ctx, step_id, {
|
||||
'total': 100,
|
||||
'completed': 59,
|
||||
})
|
||||
|
||||
ncluster = self.api.cluster_get(ctx, cluster.id)
|
||||
self.assertEqual(1, len(ncluster['provision_progress']))
|
||||
provision_step = ncluster['provision_progress'][0]
|
||||
|
||||
self.assertEqual(100, provision_step['total'])
|
||||
self.assertEqual(59, provision_step['completed'])
|
||||
|
||||
# test adding event to step and getting events from step
|
||||
|
||||
self.api.cluster_event_add(ctx, step_id, {
|
||||
@ -162,14 +165,13 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
|
||||
'successful': True
|
||||
})
|
||||
|
||||
events = self.api.cluster_provision_step_get_events(ctx, step_id)
|
||||
events = self._get_events(ctx, cluster.id, step_id)
|
||||
self.assertEqual(1, len(events))
|
||||
self.assertEqual(st_name, events[0].instance_name)
|
||||
self.assertEqual(True, events[0].successful)
|
||||
self.assertEqual(st_info, events[0].event_info)
|
||||
|
||||
# test removing events from step
|
||||
self.api.cluster_destroy(ctx, cluster.id)
|
||||
|
||||
self.api.cluster_provision_step_remove_events(ctx, step_id)
|
||||
events = self.api.cluster_provision_step_get_events(ctx, step_id)
|
||||
self.assertEqual(0, len(events))
|
||||
with testtools.ExpectedException(exceptions.NotFoundException):
|
||||
self._get_events(ctx, cluster.id, step_id)
|
||||
|
@ -456,6 +456,14 @@ class SaharaMigrationsCheckers(object):
|
||||
self.assertColumnExists(engine, 'node_group_templates', 'is_default')
|
||||
self.assertColumnExists(engine, 'cluster_templates', 'is_default')
|
||||
|
||||
def _check_020(self, engine, data):
|
||||
self.assertColumnNotExists(engine, 'cluster_provision_steps',
|
||||
'completed')
|
||||
self.assertColumnNotExists(engine, 'cluster_provision_steps',
|
||||
'completed_at')
|
||||
self.assertColumnNotExists(engine, 'cluster_provision_steps',
|
||||
'started_at')
|
||||
|
||||
|
||||
class TestMigrationsMySQL(SaharaMigrationsCheckers,
|
||||
base.BaseWalkMigrationTestCase,
|
||||
|
@ -83,15 +83,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
|
||||
@mock.patch('sahara.service.volumes._await_attach_volumes')
|
||||
@mock.patch('sahara.service.volumes._create_attach_volume')
|
||||
@mock.patch('sahara.utils.cluster_progress_ops.add_successful_event')
|
||||
@mock.patch('sahara.utils.cluster_progress_ops.update_provisioning_steps')
|
||||
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
|
||||
def test_attach(self, add_step, update_step, add_event,
|
||||
def test_attach(self, add_step, add_event,
|
||||
p_create_attach_vol, p_await, p_mount):
|
||||
p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2
|
||||
p_await.return_value = None
|
||||
p_mount.return_value = None
|
||||
add_event.return_value = None
|
||||
update_step.return_value = None
|
||||
add_step.return_value = None
|
||||
|
||||
instance1 = {'id': '1',
|
||||
|
@ -54,7 +54,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
||||
"successful": True
|
||||
})
|
||||
|
||||
cpo.update_provisioning_steps(cluster.id)
|
||||
self.api.cluster_provision_progress_update(ctx, cluster.id)
|
||||
|
||||
# check that we have correct provision step
|
||||
|
||||
@ -62,7 +62,6 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
||||
result_step = result_cluster.provision_progress[0]
|
||||
|
||||
self.assertEqual(None, result_step.successful)
|
||||
self.assertEqual(1, result_step.completed)
|
||||
|
||||
# check updating in case of successful provision step
|
||||
|
||||
@ -71,13 +70,12 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
||||
"successful": True
|
||||
})
|
||||
|
||||
cpo.update_provisioning_steps(cluster.id)
|
||||
self.api.cluster_provision_progress_update(ctx, cluster.id)
|
||||
|
||||
result_cluster = self.api.cluster_get(ctx, cluster.id)
|
||||
result_step = result_cluster.provision_progress[0]
|
||||
|
||||
self.assertEqual(True, result_step.successful)
|
||||
self.assertEqual(2, result_step.completed)
|
||||
|
||||
# check updating in case of failed provision step
|
||||
|
||||
@ -91,7 +89,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
||||
"successful": False,
|
||||
})
|
||||
|
||||
cpo.update_provisioning_steps(cluster.id)
|
||||
self.api.cluster_provision_progress_update(ctx, cluster.id)
|
||||
|
||||
result_cluster = self.api.cluster_get(ctx, cluster.id)
|
||||
|
||||
@ -100,11 +98,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
||||
self.assertEqual(False, step.successful)
|
||||
|
||||
# check that it's possible to add provision step after failed step
|
||||
|
||||
step_id3 = self.api.cluster_provision_step_add(ctx, cluster.id, {
|
||||
"step_name": "some_name",
|
||||
"total": 2,
|
||||
})
|
||||
step_id3 = cpo.add_provisioning_step(cluster.id, "some_name", 2)
|
||||
|
||||
self.assertEqual(
|
||||
step_id3, cpo.get_current_provisioning_step(cluster.id))
|
||||
@ -114,9 +108,11 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
||||
|
||||
step_id1 = self.api.cluster_provision_step_add(ctx, cluster.id, {
|
||||
'step_name': "some_name1",
|
||||
'total': 3,
|
||||
})
|
||||
step_id2 = self.api.cluster_provision_step_add(ctx, cluster.id, {
|
||||
'step_name': "some_name",
|
||||
'total': 2,
|
||||
})
|
||||
|
||||
self.api.cluster_event_add(ctx, step_id1, {
|
||||
|
@ -39,6 +39,9 @@ CONF.register_opts(event_log_opts)
|
||||
|
||||
|
||||
def add_successful_event(instance):
|
||||
if CONF.disable_event_log:
|
||||
return
|
||||
|
||||
cluster_id = instance.cluster_id
|
||||
step_id = get_current_provisioning_step(cluster_id)
|
||||
if step_id:
|
||||
@ -49,10 +52,12 @@ def add_successful_event(instance):
|
||||
'instance_name': instance.instance_name,
|
||||
'event_info': None,
|
||||
})
|
||||
update_provisioning_steps(cluster_id)
|
||||
|
||||
|
||||
def add_fail_event(instance, exception):
|
||||
if CONF.disable_event_log:
|
||||
return
|
||||
|
||||
cluster_id = instance.cluster_id
|
||||
step_id = get_current_provisioning_step(cluster_id)
|
||||
event_info = six.text_type(exception)
|
||||
@ -65,16 +70,18 @@ def add_fail_event(instance, exception):
|
||||
'instance_name': instance.instance_name,
|
||||
'event_info': event_info,
|
||||
})
|
||||
update_provisioning_steps(cluster_id)
|
||||
|
||||
|
||||
def add_provisioning_step(cluster_id, step_name, total):
|
||||
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
|
||||
return
|
||||
|
||||
update_provisioning_steps(cluster_id)
|
||||
prev_step = get_current_provisioning_step(cluster_id)
|
||||
if prev_step:
|
||||
conductor.cluster_provision_step_update(context.ctx(), prev_step)
|
||||
|
||||
step_type = context.ctx().current_instance_info.step_type
|
||||
return conductor.cluster_provision_step_add(
|
||||
new_step = conductor.cluster_provision_step_add(
|
||||
context.ctx(), cluster_id, {
|
||||
'step_name': step_name,
|
||||
'step_type': step_type,
|
||||
@ -82,64 +89,15 @@ def add_provisioning_step(cluster_id, step_name, total):
|
||||
'total': total,
|
||||
'started_at': timeutils.utcnow(),
|
||||
})
|
||||
context.current().current_instance_info.step_id = new_step
|
||||
return new_step
|
||||
|
||||
|
||||
def get_current_provisioning_step(cluster_id):
|
||||
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
|
||||
return None
|
||||
|
||||
update_provisioning_steps(cluster_id)
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
for step in cluster.provision_progress:
|
||||
if step.successful is not None:
|
||||
continue
|
||||
|
||||
return step.id
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def update_provisioning_steps(cluster_id):
|
||||
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
|
||||
return
|
||||
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
|
||||
for step in cluster.provision_progress:
|
||||
if step.successful is not None:
|
||||
continue
|
||||
|
||||
has_failed = False
|
||||
successful_events_count = 0
|
||||
events = conductor.cluster_provision_step_get_events(
|
||||
ctx, step.id)
|
||||
for event in events:
|
||||
if event.successful:
|
||||
successful_events_count += 1
|
||||
else:
|
||||
has_failed = True
|
||||
|
||||
successful = None
|
||||
if has_failed:
|
||||
successful = False
|
||||
elif successful_events_count == step.total:
|
||||
successful = True
|
||||
|
||||
completed_at = None
|
||||
if successful and not step.completed_at:
|
||||
completed_at = timeutils.utcnow()
|
||||
|
||||
conductor.cluster_provision_step_update(ctx, step.id, {
|
||||
'completed': successful_events_count,
|
||||
'successful': successful,
|
||||
'completed_at': completed_at,
|
||||
})
|
||||
|
||||
if successful:
|
||||
conductor.cluster_provision_step_remove_events(
|
||||
ctx, step.id)
|
||||
current_instance_info = context.ctx().current_instance_info
|
||||
return current_instance_info.step_id
|
||||
|
||||
|
||||
def event_wrapper(mark_successful_on_exit, **spec):
|
||||
|
@ -102,6 +102,7 @@ def change_cluster_status(cluster, status, status_description=None):
|
||||
update_dict["status_description"] = status_description
|
||||
|
||||
cluster = conductor.cluster_update(ctx, cluster, update_dict)
|
||||
conductor.cluster_provision_progress_update(ctx, cluster.id)
|
||||
|
||||
LOG.info(_LI("Cluster status has been changed: id={id}, New status="
|
||||
"{status}").format(id=cluster.id, status=cluster.status))
|
||||
|
Loading…
Reference in New Issue
Block a user