Move updating provision progress to conductor

Changes:
 * Move updating provision progress to conductor;
 * Remove redundant conductor ops;
 * Improve all updates operations.

This change would improve complexity of all event log operations
and decrease number of updates of provisioning steps.
Also we remove unneeded fields in base, because we can calculate
it very easy from UI.

partially implements: bp event-log

Change-Id: I63c593d5ba27879edc2142a1e2ab310628ee045c
This commit is contained in:
Vitaly Gridnev 2015-02-26 14:41:49 +03:00
parent 5caebca4ee
commit 7f2f40e646
15 changed files with 172 additions and 197 deletions

View File

@ -442,31 +442,24 @@ class LocalApi(object):
# Events ops
def cluster_provision_step_add(self, context, cluster_id, values):
"""Create a cluster assigned ProvisionStep
from the values dictionary
"""
"""Create a provisioning step assigned to cluster from values dict."""
return self._manager.cluster_provision_step_add(
context, cluster_id, values)
def cluster_provision_step_update(self, context, provision_step, values):
"""Update the ProvisionStep from the values dictionary."""
self._manager.cluster_provision_step_update(
context, provision_step, values)
def cluster_provision_step_get_events(self, context, provision_step):
"""Return all events from the specified ProvisionStep."""
return self._manager.cluster_provision_step_get_events(
def cluster_provision_step_update(self, context, provision_step):
"""Update the cluster provisioning step."""
return self._manager.cluster_provision_step_update(
context, provision_step)
def cluster_provision_step_remove_events(self, context, provision_step):
"""Delete all event from the specified ProvisionStep."""
self._manager.cluster_provision_step_remove_events(
context, provision_step)
def cluster_provision_progress_update(self, context, cluster_id):
"""Return cluster with provision progress updated field."""
return self._manager.cluster_provision_progress_update(
context, cluster_id)
def cluster_event_add(self, context, provision_step, values):
"""Assign new event to the specified ProvisionStep."""
self._manager.cluster_event_add(context, provision_step, values)
"""Assign new event to the specified provision step."""
return self._manager.cluster_event_add(
context, provision_step, values)
class RemoteApi(LocalApi):

View File

@ -473,25 +473,17 @@ class ConductorManager(db_base.Base):
# Events ops
def cluster_provision_step_add(self, context, cluster_id, values):
"""Create a cluster assigned ProvisionStep
from the values dictionary
"""
"""Create a provisioning step assigned to cluster from values dict."""
return self.db.cluster_provision_step_add(context, cluster_id, values)
def cluster_provision_step_update(self, context, provision_step, values):
"""Update the ProvisionStep from the values dictionary."""
self.db.cluster_provision_step_update(context, provision_step, values)
def cluster_provision_step_update(self, context, provision_step):
"""Update the cluster provisioning step."""
return self.db.cluster_provision_step_update(context, provision_step)
def cluster_provision_step_get_events(self, context, provision_step):
"""Return all events from the specified ProvisionStep."""
return self.db.cluster_provision_step_get_events(
context, provision_step)
def cluster_provision_step_remove_events(self, context, provision_step):
"""Delete all event from the specified ProvisionStep."""
self.db.cluster_provision_step_remove_events(context, provision_step)
def cluster_provision_progress_update(self, context, cluster_id):
"""Return cluster with provision progress updated field."""
return self.db.cluster_provision_progress_update(context, cluster_id)
def cluster_event_add(self, context, provision_step, values):
"""Assign new event to the specified ProvisionStep."""
self.db.cluster_event_add(context, provision_step, values)
"""Assign new event to the specified provision step."""
return self.db.cluster_event_add(context, provision_step, values)

View File

@ -293,11 +293,8 @@ class ClusterProvisionStep(object):
tenant_id
step_name
step_type
completed
total
successful
started_at
completed_at
events - list of Events objects assigned to the cluster
"""

View File

@ -274,12 +274,13 @@ def sleep(seconds=0):
class InstanceInfo(object):
def __init__(self, cluster_id=None, instance_id=None, instance_name=None,
node_group_id=None, step_type=None):
node_group_id=None, step_type=None, step_id=None):
self.cluster_id = cluster_id
self.instance_id = instance_id
self.instance_name = instance_name
self.node_group_id = node_group_id
self.step_type = step_type
self.step_id = step_id
def set_step_type(step_type):
@ -291,6 +292,8 @@ class InstanceInfoManager(object):
self.prev_instance_info = current().current_instance_info
if not instance_info.step_type:
instance_info.step_type = self.prev_instance_info.step_type
if not instance_info.step_id:
instance_info.step_id = self.prev_instance_info.step_id
current().current_instance_info = instance_info
def __enter__(self):

View File

@ -110,7 +110,10 @@ def to_dict(func):
def cluster_get(context, cluster, show_progress=False):
"""Return the cluster or None if it does not exist."""
cluster = IMPL.cluster_get(context, cluster)
if show_progress:
cluster = IMPL.cluster_provision_progress_update(context, cluster)
else:
cluster = IMPL.cluster_get(context, cluster)
if cluster:
return cluster.to_dict(show_progress)
return None
@ -446,27 +449,24 @@ def job_binary_internal_get_raw_data(context, job_binary_internal_id):
return IMPL.job_binary_internal_get_raw_data(context,
job_binary_internal_id)
# Events ops
def cluster_provision_step_add(context, cluster_id, values):
"""Create a cluster assigned ProvisionStep from the values dictionary."""
return IMPL.cluster_provision_step_add(context, cluster_id, values)
def cluster_provision_step_update(context, provision_step, values):
"""Update the ProvisionStep from the values dictionary."""
IMPL.cluster_provision_step_update(context, provision_step, values)
def cluster_provision_step_update(context, step_id):
"""Updates provision step."""
return IMPL.cluster_provision_step_update(context, step_id)
def cluster_provision_step_get_events(context, provision_step):
"""Return all events from the specified ProvisionStep."""
return IMPL.cluster_provision_step_get_events(context, provision_step)
def cluster_provision_step_remove_events(context, provision_step):
"""Delete all event from the specified ProvisionStep."""
IMPL.cluster_provision_step_remove_events(context, provision_step)
def cluster_provision_progress_update(context, cluster_id):
"""Return cluster with provision progress updated field."""
return IMPL.cluster_provision_progress_update(context, cluster_id)
def cluster_event_add(context, provision_step, values):
"""Assign new event to the specified ProvisionStep."""
IMPL.cluster_event_add(context, provision_step, values)
"""Assign new event to the specified provision step."""
return IMPL.cluster_event_add(context, provision_step, values)

View File

@ -0,0 +1,44 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""remove redandunt progress ops
Revision ID: 020
Revises: 019
Create Date: 2015-02-26 15:01:41.015076
"""
# revision identifiers, used by Alembic.
revision = '020'
down_revision = '019'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.drop_column('cluster_provision_steps', 'completed_at')
op.drop_column('cluster_provision_steps', 'completed')
op.drop_column('cluster_provision_steps', 'started_at')
def downgrade():
op.add_column('cluster_provision_steps',
sa.Column('completed', sa.Integer(), nullable=True))
op.add_column('cluster_provision_steps',
sa.Column('started_at', sa.DateTime(), nullable=True))
op.add_column('cluster_provision_steps',
sa.Column('completed_at', sa.DateTime(), nullable=True))

View File

@ -1114,6 +1114,22 @@ def _cluster_provision_step_get(context, session, provision_step_id):
return query.filter_by(id=provision_step_id).first()
def _cluster_provision_step_update(context, session, step_id):
step = _cluster_provision_step_get(context, session, step_id)
if step is None:
raise ex.NotFoundException(
step_id,
_("Cluster Provision Step id '%s' not found!"))
if step.successful is not None:
return
if len(step.events) == step.total:
for event in step.events:
session.delete(event)
step.update({'successful': True})
def cluster_provision_step_add(context, cluster_id, values):
session = get_session()
@ -1132,64 +1148,29 @@ def cluster_provision_step_add(context, cluster_id, values):
return provision_step.id
def cluster_provision_step_update(context, provision_step_id, values):
session = get_session()
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
provision_step.update(values)
def cluster_provision_step_get_events(context, provision_step_id):
def cluster_provision_step_update(context, step_id):
if CONF.disable_event_log:
return
session = get_session()
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
return provision_step.events
_cluster_provision_step_update(context, session, step_id)
def cluster_provision_step_remove_events(context, provision_step_id):
def cluster_provision_progress_update(context, cluster_id):
if CONF.disable_event_log:
return _cluster_get(context, get_session(), cluster_id)
session = get_session()
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
cluster = _cluster_get(context, session, cluster_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
for event in provision_step.events:
session.delete(event)
def cluster_provision_step_remove(context, provision_step_id):
session = get_session()
cluster_provision_step_remove_events(context, provision_step_id)
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
session.delete(provision_step)
if cluster is None:
raise ex.NotFoundException(cluster_id,
_("Cluster id '%s' not found!"))
for step in cluster.provision_progress:
if step.successful is None:
_cluster_provision_step_update(context, session, step.id)
result_cluster = _cluster_get(context, session, cluster_id)
return result_cluster
def cluster_event_add(context, step_id, values):
@ -1206,6 +1187,8 @@ def cluster_event_add(context, step_id, values):
event = m.ClusterEvent()
values['step_id'] = step_id
if not values['successful']:
provision_step.update({'successful': False})
event.update(values)
session.add(event)

View File

@ -416,11 +416,8 @@ class ClusterProvisionStep(mb.SaharaBase):
tenant_id = sa.Column(sa.String(36))
step_name = sa.Column(sa.String(80))
step_type = sa.Column(sa.String(36))
completed = sa.Column(sa.Integer)
total = sa.Column(sa.Integer)
successful = sa.Column(sa.Boolean, nullable=True)
started_at = sa.Column(sa.DateTime())
completed_at = sa.Column(sa.DateTime())
events = relationship('ClusterEvent', cascade="all,delete",
backref='ClusterProvisionStep',
lazy='joined')

View File

@ -147,6 +147,8 @@ def _make_periodic_tasks():
continue
terminate_cluster(ctx, cluster, description='transient')
# Add event log info cleanup
context.ctx().current_instance_info = context.InstanceInfo()
context.set_ctx(None)
@periodic_task.periodic_task(spacing=zombie_task_spacing)
@ -185,7 +187,8 @@ def _make_periodic_tasks():
continue
terminate_cluster(ctx, cluster, description='incomplete')
# Add event log info cleanup
context.ctx().current_instance_info = context.InstanceInfo()
context.set_ctx(None)
return SaharaPeriodicTasks()

View File

@ -13,8 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from sahara import conductor
from sahara import context
from sahara import exceptions
from sahara.tests.unit import base
from sahara.utils import general as gu
@ -116,6 +120,19 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
ng = gu.get_by_id(cluster.node_groups, ng_id)
self.assertEqual(ng.instances[0].instance_name, 'tst123')
def _get_events(self, ctx, cluster_id, step_id=None):
cluster = self.api.cluster_get(ctx, cluster_id, show_progress=True)
events = []
for step in cluster.provision_progress:
if step_id == step['id']:
return step['events']
else:
events += step['events']
if step_id:
return events
else:
return []
def test_events_ops(self):
ctx, cluster = self._make_sample()
@ -138,20 +155,6 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
self.assertEqual(st_type, provision_step['step_type'])
self.assertEqual(cluster.id, provision_step['cluster_id'])
# test provision step updating
self.api.cluster_provision_step_update(ctx, step_id, {
'total': 100,
'completed': 59,
})
ncluster = self.api.cluster_get(ctx, cluster.id)
self.assertEqual(1, len(ncluster['provision_progress']))
provision_step = ncluster['provision_progress'][0]
self.assertEqual(100, provision_step['total'])
self.assertEqual(59, provision_step['completed'])
# test adding event to step and getting events from step
self.api.cluster_event_add(ctx, step_id, {
@ -162,14 +165,13 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
'successful': True
})
events = self.api.cluster_provision_step_get_events(ctx, step_id)
events = self._get_events(ctx, cluster.id, step_id)
self.assertEqual(1, len(events))
self.assertEqual(st_name, events[0].instance_name)
self.assertEqual(True, events[0].successful)
self.assertEqual(st_info, events[0].event_info)
# test removing events from step
self.api.cluster_destroy(ctx, cluster.id)
self.api.cluster_provision_step_remove_events(ctx, step_id)
events = self.api.cluster_provision_step_get_events(ctx, step_id)
self.assertEqual(0, len(events))
with testtools.ExpectedException(exceptions.NotFoundException):
self._get_events(ctx, cluster.id, step_id)

View File

@ -456,6 +456,14 @@ class SaharaMigrationsCheckers(object):
self.assertColumnExists(engine, 'node_group_templates', 'is_default')
self.assertColumnExists(engine, 'cluster_templates', 'is_default')
def _check_020(self, engine, data):
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'completed')
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'completed_at')
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'started_at')
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -83,15 +83,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.volumes._await_attach_volumes')
@mock.patch('sahara.service.volumes._create_attach_volume')
@mock.patch('sahara.utils.cluster_progress_ops.add_successful_event')
@mock.patch('sahara.utils.cluster_progress_ops.update_provisioning_steps')
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
def test_attach(self, add_step, update_step, add_event,
def test_attach(self, add_step, add_event,
p_create_attach_vol, p_await, p_mount):
p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2
p_await.return_value = None
p_mount.return_value = None
add_event.return_value = None
update_step.return_value = None
add_step.return_value = None
instance1 = {'id': '1',

View File

@ -54,7 +54,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
"successful": True
})
cpo.update_provisioning_steps(cluster.id)
self.api.cluster_provision_progress_update(ctx, cluster.id)
# check that we have correct provision step
@ -62,7 +62,6 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
result_step = result_cluster.provision_progress[0]
self.assertEqual(None, result_step.successful)
self.assertEqual(1, result_step.completed)
# check updating in case of successful provision step
@ -71,13 +70,12 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
"successful": True
})
cpo.update_provisioning_steps(cluster.id)
self.api.cluster_provision_progress_update(ctx, cluster.id)
result_cluster = self.api.cluster_get(ctx, cluster.id)
result_step = result_cluster.provision_progress[0]
self.assertEqual(True, result_step.successful)
self.assertEqual(2, result_step.completed)
# check updating in case of failed provision step
@ -91,7 +89,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
"successful": False,
})
cpo.update_provisioning_steps(cluster.id)
self.api.cluster_provision_progress_update(ctx, cluster.id)
result_cluster = self.api.cluster_get(ctx, cluster.id)
@ -100,11 +98,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
self.assertEqual(False, step.successful)
# check that it's possible to add provision step after failed step
step_id3 = self.api.cluster_provision_step_add(ctx, cluster.id, {
"step_name": "some_name",
"total": 2,
})
step_id3 = cpo.add_provisioning_step(cluster.id, "some_name", 2)
self.assertEqual(
step_id3, cpo.get_current_provisioning_step(cluster.id))
@ -114,9 +108,11 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
step_id1 = self.api.cluster_provision_step_add(ctx, cluster.id, {
'step_name': "some_name1",
'total': 3,
})
step_id2 = self.api.cluster_provision_step_add(ctx, cluster.id, {
'step_name': "some_name",
'total': 2,
})
self.api.cluster_event_add(ctx, step_id1, {

View File

@ -39,6 +39,9 @@ CONF.register_opts(event_log_opts)
def add_successful_event(instance):
if CONF.disable_event_log:
return
cluster_id = instance.cluster_id
step_id = get_current_provisioning_step(cluster_id)
if step_id:
@ -49,10 +52,12 @@ def add_successful_event(instance):
'instance_name': instance.instance_name,
'event_info': None,
})
update_provisioning_steps(cluster_id)
def add_fail_event(instance, exception):
if CONF.disable_event_log:
return
cluster_id = instance.cluster_id
step_id = get_current_provisioning_step(cluster_id)
event_info = six.text_type(exception)
@ -65,16 +70,18 @@ def add_fail_event(instance, exception):
'instance_name': instance.instance_name,
'event_info': event_info,
})
update_provisioning_steps(cluster_id)
def add_provisioning_step(cluster_id, step_name, total):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
return
update_provisioning_steps(cluster_id)
prev_step = get_current_provisioning_step(cluster_id)
if prev_step:
conductor.cluster_provision_step_update(context.ctx(), prev_step)
step_type = context.ctx().current_instance_info.step_type
return conductor.cluster_provision_step_add(
new_step = conductor.cluster_provision_step_add(
context.ctx(), cluster_id, {
'step_name': step_name,
'step_type': step_type,
@ -82,64 +89,15 @@ def add_provisioning_step(cluster_id, step_name, total):
'total': total,
'started_at': timeutils.utcnow(),
})
context.current().current_instance_info.step_id = new_step
return new_step
def get_current_provisioning_step(cluster_id):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
return None
update_provisioning_steps(cluster_id)
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
for step in cluster.provision_progress:
if step.successful is not None:
continue
return step.id
return None
def update_provisioning_steps(cluster_id):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
return
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
for step in cluster.provision_progress:
if step.successful is not None:
continue
has_failed = False
successful_events_count = 0
events = conductor.cluster_provision_step_get_events(
ctx, step.id)
for event in events:
if event.successful:
successful_events_count += 1
else:
has_failed = True
successful = None
if has_failed:
successful = False
elif successful_events_count == step.total:
successful = True
completed_at = None
if successful and not step.completed_at:
completed_at = timeutils.utcnow()
conductor.cluster_provision_step_update(ctx, step.id, {
'completed': successful_events_count,
'successful': successful,
'completed_at': completed_at,
})
if successful:
conductor.cluster_provision_step_remove_events(
ctx, step.id)
current_instance_info = context.ctx().current_instance_info
return current_instance_info.step_id
def event_wrapper(mark_successful_on_exit, **spec):

View File

@ -102,6 +102,7 @@ def change_cluster_status(cluster, status, status_description=None):
update_dict["status_description"] = status_description
cluster = conductor.cluster_update(ctx, cluster, update_dict)
conductor.cluster_provision_progress_update(ctx, cluster.id)
LOG.info(_LI("Cluster status has been changed: id={id}, New status="
"{status}").format(id=cluster.id, status=cluster.status))