diff --git a/sahara/conductor/api.py b/sahara/conductor/api.py index 97dd5d2b..f7788b9b 100644 --- a/sahara/conductor/api.py +++ b/sahara/conductor/api.py @@ -442,31 +442,24 @@ class LocalApi(object): # Events ops def cluster_provision_step_add(self, context, cluster_id, values): - """Create a cluster assigned ProvisionStep - - from the values dictionary - """ + """Create a provisioning step assigned to cluster from values dict.""" return self._manager.cluster_provision_step_add( context, cluster_id, values) - def cluster_provision_step_update(self, context, provision_step, values): - """Update the ProvisionStep from the values dictionary.""" - self._manager.cluster_provision_step_update( - context, provision_step, values) - - def cluster_provision_step_get_events(self, context, provision_step): - """Return all events from the specified ProvisionStep.""" - return self._manager.cluster_provision_step_get_events( + def cluster_provision_step_update(self, context, provision_step): + """Update the cluster provisioning step.""" + return self._manager.cluster_provision_step_update( context, provision_step) - def cluster_provision_step_remove_events(self, context, provision_step): - """Delete all event from the specified ProvisionStep.""" - self._manager.cluster_provision_step_remove_events( - context, provision_step) + def cluster_provision_progress_update(self, context, cluster_id): + """Return cluster with provision progress updated field.""" + return self._manager.cluster_provision_progress_update( + context, cluster_id) def cluster_event_add(self, context, provision_step, values): - """Assign new event to the specified ProvisionStep.""" - self._manager.cluster_event_add(context, provision_step, values) + """Assign new event to the specified provision step.""" + return self._manager.cluster_event_add( + context, provision_step, values) class RemoteApi(LocalApi): diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index e5b55668..be53b03f 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -473,25 +473,17 @@ class ConductorManager(db_base.Base): # Events ops def cluster_provision_step_add(self, context, cluster_id, values): - """Create a cluster assigned ProvisionStep - - from the values dictionary - """ + """Create a provisioning step assigned to cluster from values dict.""" return self.db.cluster_provision_step_add(context, cluster_id, values) - def cluster_provision_step_update(self, context, provision_step, values): - """Update the ProvisionStep from the values dictionary.""" - self.db.cluster_provision_step_update(context, provision_step, values) + def cluster_provision_step_update(self, context, provision_step): + """Update the cluster provisioning step.""" + return self.db.cluster_provision_step_update(context, provision_step) - def cluster_provision_step_get_events(self, context, provision_step): - """Return all events from the specified ProvisionStep.""" - return self.db.cluster_provision_step_get_events( - context, provision_step) - - def cluster_provision_step_remove_events(self, context, provision_step): - """Delete all event from the specified ProvisionStep.""" - self.db.cluster_provision_step_remove_events(context, provision_step) + def cluster_provision_progress_update(self, context, cluster_id): + """Return cluster with provision progress updated field.""" + return self.db.cluster_provision_progress_update(context, cluster_id) def cluster_event_add(self, context, provision_step, values): - """Assign new event to the specified ProvisionStep.""" - self.db.cluster_event_add(context, provision_step, values) + """Assign new event to the specified provision step.""" + return self.db.cluster_event_add(context, provision_step, values) diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 626eb3ef..bccc6c90 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -293,11 +293,8 @@ class ClusterProvisionStep(object): tenant_id step_name step_type - completed total successful - started_at - completed_at events - list of Events objects assigned to the cluster """ diff --git a/sahara/context.py b/sahara/context.py index 009e59aa..c7cfef39 100644 --- a/sahara/context.py +++ b/sahara/context.py @@ -274,12 +274,13 @@ def sleep(seconds=0): class InstanceInfo(object): def __init__(self, cluster_id=None, instance_id=None, instance_name=None, - node_group_id=None, step_type=None): + node_group_id=None, step_type=None, step_id=None): self.cluster_id = cluster_id self.instance_id = instance_id self.instance_name = instance_name self.node_group_id = node_group_id self.step_type = step_type + self.step_id = step_id def set_step_type(step_type): @@ -291,6 +292,8 @@ class InstanceInfoManager(object): self.prev_instance_info = current().current_instance_info if not instance_info.step_type: instance_info.step_type = self.prev_instance_info.step_type + if not instance_info.step_id: + instance_info.step_id = self.prev_instance_info.step_id current().current_instance_info = instance_info def __enter__(self): diff --git a/sahara/db/api.py b/sahara/db/api.py index c03a21f5..8d1433ac 100644 --- a/sahara/db/api.py +++ b/sahara/db/api.py @@ -110,7 +110,10 @@ def to_dict(func): def cluster_get(context, cluster, show_progress=False): """Return the cluster or None if it does not exist.""" - cluster = IMPL.cluster_get(context, cluster) + if show_progress: + cluster = IMPL.cluster_provision_progress_update(context, cluster) + else: + cluster = IMPL.cluster_get(context, cluster) if cluster: return cluster.to_dict(show_progress) return None @@ -446,27 +449,24 @@ def job_binary_internal_get_raw_data(context, job_binary_internal_id): return IMPL.job_binary_internal_get_raw_data(context, job_binary_internal_id) +# Events ops + def cluster_provision_step_add(context, cluster_id, values): """Create a cluster assigned ProvisionStep from the values dictionary.""" return IMPL.cluster_provision_step_add(context, cluster_id, values) -def cluster_provision_step_update(context, provision_step, values): - """Update the ProvisionStep from the values dictionary.""" - IMPL.cluster_provision_step_update(context, provision_step, values) +def cluster_provision_step_update(context, step_id): + """Updates provision step.""" + return IMPL.cluster_provision_step_update(context, step_id) -def cluster_provision_step_get_events(context, provision_step): - """Return all events from the specified ProvisionStep.""" - return IMPL.cluster_provision_step_get_events(context, provision_step) - - -def cluster_provision_step_remove_events(context, provision_step): - """Delete all event from the specified ProvisionStep.""" - IMPL.cluster_provision_step_remove_events(context, provision_step) +def cluster_provision_progress_update(context, cluster_id): + """Return cluster with provision progress updated field.""" + return IMPL.cluster_provision_progress_update(context, cluster_id) def cluster_event_add(context, provision_step, values): - """Assign new event to the specified ProvisionStep.""" - IMPL.cluster_event_add(context, provision_step, values) + """Assign new event to the specified provision step.""" + return IMPL.cluster_event_add(context, provision_step, values) diff --git a/sahara/db/migration/alembic_migrations/versions/020_remove_redandunt_progress_ops.py b/sahara/db/migration/alembic_migrations/versions/020_remove_redandunt_progress_ops.py new file mode 100644 index 00000000..adb96d01 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/020_remove_redandunt_progress_ops.py @@ -0,0 +1,44 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""remove redandunt progress ops + +Revision ID: 020 +Revises: 019 +Create Date: 2015-02-26 15:01:41.015076 + +""" + +# revision identifiers, used by Alembic. +revision = '020' +down_revision = '019' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.drop_column('cluster_provision_steps', 'completed_at') + op.drop_column('cluster_provision_steps', 'completed') + op.drop_column('cluster_provision_steps', 'started_at') + + +def downgrade(): + op.add_column('cluster_provision_steps', + sa.Column('completed', sa.Integer(), nullable=True)) + op.add_column('cluster_provision_steps', + sa.Column('started_at', sa.DateTime(), nullable=True)) + op.add_column('cluster_provision_steps', + sa.Column('completed_at', sa.DateTime(), nullable=True)) diff --git a/sahara/db/sqlalchemy/api.py b/sahara/db/sqlalchemy/api.py index 73c36437..d31ba541 100644 --- a/sahara/db/sqlalchemy/api.py +++ b/sahara/db/sqlalchemy/api.py @@ -1114,6 +1114,22 @@ def _cluster_provision_step_get(context, session, provision_step_id): return query.filter_by(id=provision_step_id).first() +def _cluster_provision_step_update(context, session, step_id): + step = _cluster_provision_step_get(context, session, step_id) + + if step is None: + raise ex.NotFoundException( + step_id, + _("Cluster Provision Step id '%s' not found!")) + + if step.successful is not None: + return + if len(step.events) == step.total: + for event in step.events: + session.delete(event) + step.update({'successful': True}) + + def cluster_provision_step_add(context, cluster_id, values): session = get_session() @@ -1132,64 +1148,29 @@ def cluster_provision_step_add(context, cluster_id, values): return provision_step.id -def cluster_provision_step_update(context, provision_step_id, values): - session = get_session() - - with session.begin(): - provision_step = _cluster_provision_step_get( - context, session, provision_step_id) - - if not provision_step: - raise ex.NotFoundException( - provision_step_id, - _("Cluster Provision Step id '%s' not found!")) - - provision_step.update(values) - - -def cluster_provision_step_get_events(context, provision_step_id): +def cluster_provision_step_update(context, step_id): + if CONF.disable_event_log: + return session = get_session() with session.begin(): - provision_step = _cluster_provision_step_get( - context, session, provision_step_id) - - if not provision_step: - raise ex.NotFoundException( - provision_step_id, - _("Cluster Provision Step id '%s' not found!")) - - return provision_step.events + _cluster_provision_step_update(context, session, step_id) -def cluster_provision_step_remove_events(context, provision_step_id): +def cluster_provision_progress_update(context, cluster_id): + if CONF.disable_event_log: + return _cluster_get(context, get_session(), cluster_id) session = get_session() - with session.begin(): - provision_step = _cluster_provision_step_get( - context, session, provision_step_id) + cluster = _cluster_get(context, session, cluster_id) - if not provision_step: - raise ex.NotFoundException( - provision_step_id, - _("Cluster Provision Step id '%s' not found!")) - - for event in provision_step.events: - session.delete(event) - - -def cluster_provision_step_remove(context, provision_step_id): - session = get_session() - cluster_provision_step_remove_events(context, provision_step_id) - with session.begin(): - provision_step = _cluster_provision_step_get( - context, session, provision_step_id) - - if not provision_step: - raise ex.NotFoundException( - provision_step_id, - _("Cluster Provision Step id '%s' not found!")) - - session.delete(provision_step) + if cluster is None: + raise ex.NotFoundException(cluster_id, + _("Cluster id '%s' not found!")) + for step in cluster.provision_progress: + if step.successful is None: + _cluster_provision_step_update(context, session, step.id) + result_cluster = _cluster_get(context, session, cluster_id) + return result_cluster def cluster_event_add(context, step_id, values): @@ -1206,6 +1187,8 @@ def cluster_event_add(context, step_id, values): event = m.ClusterEvent() values['step_id'] = step_id + if not values['successful']: + provision_step.update({'successful': False}) event.update(values) session.add(event) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index f0b2d2cd..ca791e59 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -416,11 +416,8 @@ class ClusterProvisionStep(mb.SaharaBase): tenant_id = sa.Column(sa.String(36)) step_name = sa.Column(sa.String(80)) step_type = sa.Column(sa.String(36)) - completed = sa.Column(sa.Integer) total = sa.Column(sa.Integer) successful = sa.Column(sa.Boolean, nullable=True) - started_at = sa.Column(sa.DateTime()) - completed_at = sa.Column(sa.DateTime()) events = relationship('ClusterEvent', cascade="all,delete", backref='ClusterProvisionStep', lazy='joined') diff --git a/sahara/service/periodic.py b/sahara/service/periodic.py index 6ab93a92..e90c4340 100644 --- a/sahara/service/periodic.py +++ b/sahara/service/periodic.py @@ -147,6 +147,8 @@ def _make_periodic_tasks(): continue terminate_cluster(ctx, cluster, description='transient') + # Add event log info cleanup + context.ctx().current_instance_info = context.InstanceInfo() context.set_ctx(None) @periodic_task.periodic_task(spacing=zombie_task_spacing) @@ -185,7 +187,8 @@ def _make_periodic_tasks(): continue terminate_cluster(ctx, cluster, description='incomplete') - + # Add event log info cleanup + context.ctx().current_instance_info = context.InstanceInfo() context.set_ctx(None) return SaharaPeriodicTasks() diff --git a/sahara/tests/unit/conductor/test_api.py b/sahara/tests/unit/conductor/test_api.py index 0688b4ee..bf1418c6 100644 --- a/sahara/tests/unit/conductor/test_api.py +++ b/sahara/tests/unit/conductor/test_api.py @@ -13,8 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. + +import testtools + from sahara import conductor from sahara import context +from sahara import exceptions from sahara.tests.unit import base from sahara.utils import general as gu @@ -116,6 +120,19 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase): ng = gu.get_by_id(cluster.node_groups, ng_id) self.assertEqual(ng.instances[0].instance_name, 'tst123') + def _get_events(self, ctx, cluster_id, step_id=None): + cluster = self.api.cluster_get(ctx, cluster_id, show_progress=True) + events = [] + for step in cluster.provision_progress: + if step_id == step['id']: + return step['events'] + else: + events += step['events'] + if step_id: + return events + else: + return [] + def test_events_ops(self): ctx, cluster = self._make_sample() @@ -138,20 +155,6 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase): self.assertEqual(st_type, provision_step['step_type']) self.assertEqual(cluster.id, provision_step['cluster_id']) - # test provision step updating - - self.api.cluster_provision_step_update(ctx, step_id, { - 'total': 100, - 'completed': 59, - }) - - ncluster = self.api.cluster_get(ctx, cluster.id) - self.assertEqual(1, len(ncluster['provision_progress'])) - provision_step = ncluster['provision_progress'][0] - - self.assertEqual(100, provision_step['total']) - self.assertEqual(59, provision_step['completed']) - # test adding event to step and getting events from step self.api.cluster_event_add(ctx, step_id, { @@ -162,14 +165,13 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase): 'successful': True }) - events = self.api.cluster_provision_step_get_events(ctx, step_id) + events = self._get_events(ctx, cluster.id, step_id) self.assertEqual(1, len(events)) self.assertEqual(st_name, events[0].instance_name) self.assertEqual(True, events[0].successful) self.assertEqual(st_info, events[0].event_info) - # test removing events from step + self.api.cluster_destroy(ctx, cluster.id) - self.api.cluster_provision_step_remove_events(ctx, step_id) - events = self.api.cluster_provision_step_get_events(ctx, step_id) - self.assertEqual(0, len(events)) + with testtools.ExpectedException(exceptions.NotFoundException): + self._get_events(ctx, cluster.id, step_id) diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index 5b106f96..be8b02b6 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -456,6 +456,14 @@ class SaharaMigrationsCheckers(object): self.assertColumnExists(engine, 'node_group_templates', 'is_default') self.assertColumnExists(engine, 'cluster_templates', 'is_default') + def _check_020(self, engine, data): + self.assertColumnNotExists(engine, 'cluster_provision_steps', + 'completed') + self.assertColumnNotExists(engine, 'cluster_provision_steps', + 'completed_at') + self.assertColumnNotExists(engine, 'cluster_provision_steps', + 'started_at') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/service/test_volumes.py b/sahara/tests/unit/service/test_volumes.py index 670504cd..7e091745 100644 --- a/sahara/tests/unit/service/test_volumes.py +++ b/sahara/tests/unit/service/test_volumes.py @@ -83,15 +83,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase): @mock.patch('sahara.service.volumes._await_attach_volumes') @mock.patch('sahara.service.volumes._create_attach_volume') @mock.patch('sahara.utils.cluster_progress_ops.add_successful_event') - @mock.patch('sahara.utils.cluster_progress_ops.update_provisioning_steps') @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') - def test_attach(self, add_step, update_step, add_event, + def test_attach(self, add_step, add_event, p_create_attach_vol, p_await, p_mount): p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2 p_await.return_value = None p_mount.return_value = None add_event.return_value = None - update_step.return_value = None add_step.return_value = None instance1 = {'id': '1', diff --git a/sahara/tests/unit/utils/test_cluster_progress_ops.py b/sahara/tests/unit/utils/test_cluster_progress_ops.py index 415a9d61..8c2a1ee7 100644 --- a/sahara/tests/unit/utils/test_cluster_progress_ops.py +++ b/sahara/tests/unit/utils/test_cluster_progress_ops.py @@ -54,7 +54,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase): "successful": True }) - cpo.update_provisioning_steps(cluster.id) + self.api.cluster_provision_progress_update(ctx, cluster.id) # check that we have correct provision step @@ -62,7 +62,6 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase): result_step = result_cluster.provision_progress[0] self.assertEqual(None, result_step.successful) - self.assertEqual(1, result_step.completed) # check updating in case of successful provision step @@ -71,13 +70,12 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase): "successful": True }) - cpo.update_provisioning_steps(cluster.id) + self.api.cluster_provision_progress_update(ctx, cluster.id) result_cluster = self.api.cluster_get(ctx, cluster.id) result_step = result_cluster.provision_progress[0] self.assertEqual(True, result_step.successful) - self.assertEqual(2, result_step.completed) # check updating in case of failed provision step @@ -91,7 +89,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase): "successful": False, }) - cpo.update_provisioning_steps(cluster.id) + self.api.cluster_provision_progress_update(ctx, cluster.id) result_cluster = self.api.cluster_get(ctx, cluster.id) @@ -100,11 +98,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase): self.assertEqual(False, step.successful) # check that it's possible to add provision step after failed step - - step_id3 = self.api.cluster_provision_step_add(ctx, cluster.id, { - "step_name": "some_name", - "total": 2, - }) + step_id3 = cpo.add_provisioning_step(cluster.id, "some_name", 2) self.assertEqual( step_id3, cpo.get_current_provisioning_step(cluster.id)) @@ -114,9 +108,11 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase): step_id1 = self.api.cluster_provision_step_add(ctx, cluster.id, { 'step_name': "some_name1", + 'total': 3, }) step_id2 = self.api.cluster_provision_step_add(ctx, cluster.id, { 'step_name': "some_name", + 'total': 2, }) self.api.cluster_event_add(ctx, step_id1, { diff --git a/sahara/utils/cluster_progress_ops.py b/sahara/utils/cluster_progress_ops.py index 729436b1..5d2d8f47 100644 --- a/sahara/utils/cluster_progress_ops.py +++ b/sahara/utils/cluster_progress_ops.py @@ -39,6 +39,9 @@ CONF.register_opts(event_log_opts) def add_successful_event(instance): + if CONF.disable_event_log: + return + cluster_id = instance.cluster_id step_id = get_current_provisioning_step(cluster_id) if step_id: @@ -49,10 +52,12 @@ def add_successful_event(instance): 'instance_name': instance.instance_name, 'event_info': None, }) - update_provisioning_steps(cluster_id) def add_fail_event(instance, exception): + if CONF.disable_event_log: + return + cluster_id = instance.cluster_id step_id = get_current_provisioning_step(cluster_id) event_info = six.text_type(exception) @@ -65,16 +70,18 @@ def add_fail_event(instance, exception): 'instance_name': instance.instance_name, 'event_info': event_info, }) - update_provisioning_steps(cluster_id) def add_provisioning_step(cluster_id, step_name, total): if CONF.disable_event_log or not g.check_cluster_exists(cluster_id): return - update_provisioning_steps(cluster_id) + prev_step = get_current_provisioning_step(cluster_id) + if prev_step: + conductor.cluster_provision_step_update(context.ctx(), prev_step) + step_type = context.ctx().current_instance_info.step_type - return conductor.cluster_provision_step_add( + new_step = conductor.cluster_provision_step_add( context.ctx(), cluster_id, { 'step_name': step_name, 'step_type': step_type, @@ -82,64 +89,15 @@ def add_provisioning_step(cluster_id, step_name, total): 'total': total, 'started_at': timeutils.utcnow(), }) + context.current().current_instance_info.step_id = new_step + return new_step def get_current_provisioning_step(cluster_id): if CONF.disable_event_log or not g.check_cluster_exists(cluster_id): return None - - update_provisioning_steps(cluster_id) - ctx = context.ctx() - cluster = conductor.cluster_get(ctx, cluster_id) - for step in cluster.provision_progress: - if step.successful is not None: - continue - - return step.id - - return None - - -def update_provisioning_steps(cluster_id): - if CONF.disable_event_log or not g.check_cluster_exists(cluster_id): - return - - ctx = context.ctx() - cluster = conductor.cluster_get(ctx, cluster_id) - - for step in cluster.provision_progress: - if step.successful is not None: - continue - - has_failed = False - successful_events_count = 0 - events = conductor.cluster_provision_step_get_events( - ctx, step.id) - for event in events: - if event.successful: - successful_events_count += 1 - else: - has_failed = True - - successful = None - if has_failed: - successful = False - elif successful_events_count == step.total: - successful = True - - completed_at = None - if successful and not step.completed_at: - completed_at = timeutils.utcnow() - - conductor.cluster_provision_step_update(ctx, step.id, { - 'completed': successful_events_count, - 'successful': successful, - 'completed_at': completed_at, - }) - - if successful: - conductor.cluster_provision_step_remove_events( - ctx, step.id) + current_instance_info = context.ctx().current_instance_info + return current_instance_info.step_id def event_wrapper(mark_successful_on_exit, **spec): diff --git a/sahara/utils/general.py b/sahara/utils/general.py index b0341ecf..c45f7961 100644 --- a/sahara/utils/general.py +++ b/sahara/utils/general.py @@ -102,6 +102,7 @@ def change_cluster_status(cluster, status, status_description=None): update_dict["status_description"] = status_description cluster = conductor.cluster_update(ctx, cluster, update_dict) + conductor.cluster_provision_progress_update(ctx, cluster.id) LOG.info(_LI("Cluster status has been changed: id={id}, New status=" "{status}").format(id=cluster.id, status=cluster.status))