diff --git a/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py b/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py index b60ebc2907..a35202a5cd 100644 --- a/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py +++ b/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py @@ -59,9 +59,11 @@ def upgrade(): upgrade_deployment_graphs_attributes() upgrade_deployment_history_summary() upgrade_node_deployment_info() + upgrade_add_task_start_end_time() def downgrade(): + downgrade_add_task_start_end_time() downgrade_node_deployment_info() downgrade_deployment_history_summary() downgrade_deployment_graphs_attributes() @@ -443,3 +445,28 @@ def upgrade_node_deployment_info(): def downgrade_node_deployment_info(): op.drop_table('node_deployment_info') + + +def upgrade_add_task_start_end_time(): + op.add_column( + 'tasks', + sa.Column( + 'time_start', + sa.TIMESTAMP(), + nullable=True, + ) + ) + + op.add_column( + 'tasks', + sa.Column( + 'time_end', + sa.TIMESTAMP(), + nullable=True, + ) + ) + + +def downgrade_add_task_start_end_time(): + op.drop_column('tasks', 'time_start') + op.drop_column('tasks', 'time_end') diff --git a/nailgun/nailgun/db/sqlalchemy/models/task.py b/nailgun/nailgun/db/sqlalchemy/models/task.py index 10d3cf6f98..00d9fc9d6b 100644 --- a/nailgun/nailgun/db/sqlalchemy/models/task.py +++ b/nailgun/nailgun/db/sqlalchemy/models/task.py @@ -26,6 +26,7 @@ from sqlalchemy import Index from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy import Text +from sqlalchemy import TIMESTAMP from sqlalchemy.orm import relationship, backref, deferred from nailgun import consts @@ -94,6 +95,9 @@ class Task(Base): deployment_history = relationship( "DeploymentHistory", backref="task", cascade="all,delete") + time_start = Column(TIMESTAMP(), nullable=True) + time_end = Column(TIMESTAMP(), nullable=True) + def __repr__(self): return "".format( self.name, diff --git a/nailgun/nailgun/objects/serializers/task.py b/nailgun/nailgun/objects/serializers/task.py index 9318dc0eb0..ad735823ad 100644 --- a/nailgun/nailgun/objects/serializers/task.py +++ b/nailgun/nailgun/objects/serializers/task.py @@ -31,4 +31,6 @@ class TaskSerializer(BasicSerializer): "parent_id", "dry_run", "graph_type", + "time_start", + "time_end" ) diff --git a/nailgun/nailgun/objects/serializers/transaction.py b/nailgun/nailgun/objects/serializers/transaction.py index 957242064e..2c562b671f 100644 --- a/nailgun/nailgun/objects/serializers/transaction.py +++ b/nailgun/nailgun/objects/serializers/transaction.py @@ -30,4 +30,6 @@ class TransactionSerializer(BasicSerializer): "progress", "dry_run", "graph_type", + "time_start", + "time_end" ) diff --git a/nailgun/nailgun/objects/task.py b/nailgun/nailgun/objects/task.py index 6d7e97dd47..096f338bd9 100644 --- a/nailgun/nailgun/objects/task.py +++ b/nailgun/nailgun/objects/task.py @@ -99,14 +99,18 @@ class Task(NailgunObject): messages.append(msg) statuses.append(status) if any(st == 'error' for st in statuses): - instance.status = 'error' + status = consts.TASK_STATUSES.error else: - instance.status = status or instance.status + status = status or previous_status + instance.progress = progress or instance.progress instance.result = result or instance.result # join messages if not None or "" instance.message = '\n'.join([m for m in messages if m]) - if previous_status != instance.status and instance.cluster_id: + + cls.update_status(instance, status) + + if previous_status != status and instance.cluster_id: logger.debug("Updating cluster status: " "cluster_id: %s status: %s", instance.cluster_id, status) @@ -123,6 +127,7 @@ class Task(NailgunObject): data['status'] = consts.TASK_STATUSES.ready data['progress'] = 100 + data['time_end'] = datetime.utcnow() data['message'] = u'\n'.join(map( lambda s: s.message, filter( lambda s: s.message is not None, subtasks))) @@ -137,10 +142,12 @@ class Task(NailgunObject): consts.TASK_STATUSES.ready): subtask.status = consts.TASK_STATUSES.error subtask.progress = 100 + subtasks.time_end = datetime.utcnow() subtask.message = "Task aborted" data['status'] = consts.TASK_STATUSES.error data['progress'] = 100 + data['time_end'] = datetime.utcnow() data['message'] = u'\n'.join(list(set(map( lambda s: (s.message or ""), filter( lambda s: ( @@ -156,6 +163,7 @@ class Task(NailgunObject): map(lambda s: s.status in (consts.TASK_STATUSES.running, consts.TASK_STATUSES.ready), subtasks)): + instance.time_start = datetime.utcnow() instance.status = consts.TASK_STATUSES.running else: @@ -293,19 +301,21 @@ class Task(NailgunObject): def update(cls, instance, data): logger.debug("Updating task: %s", instance.uuid) clean_data = cls._clean_data(data) + status = clean_data.pop('status', None) super(Task, cls).update(instance, clean_data) - db().flush() + if status: + cls.update_status(instance, status) # update cluster only if task status was updated - if instance.cluster_id and 'status' in clean_data: - logger.debug("Updating cluster status: %s " - "cluster_id: %s status: %s", - instance.uuid, instance.cluster_id, - data.get('status')) + if instance.cluster_id and status: + logger.debug( + "Updating cluster status: %s cluster_id: %s status: %s", + instance.uuid, instance.cluster_id, status) cls._update_cluster_data(instance) - if instance.parent and \ - {'status', 'message', 'progress'}.intersection(clean_data): + message = clean_data.get('message') + progress = clean_data.get('progress') + if instance.parent and (status or message or progress): logger.debug("Updating parent task: %s.", instance.parent.uuid) cls._update_parent_instance(instance.parent) @@ -327,6 +337,31 @@ class Task(NailgunObject): .update({'deleted_at': datetime.utcnow()}, synchronize_session='fetch') + @classmethod + def update_status(cls, instance, new_status): + if instance.status == new_status: + return + + finish_status = ( + consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error + ) + data = {'status': new_status} + if instance.status == consts.TASK_STATUSES.pending: + data['time_start'] = datetime.utcnow() + + if new_status in finish_status: + data['time_end'] = datetime.utcnow() + + super(Task, cls).update(instance, data) + + @classmethod + def on_start(cls, instance): + instance.time_start = datetime.utcnow() + + @classmethod + def on_finish(cls, instance): + instance.time_end = datetime.utcnow() + class TaskCollection(NailgunCollection): diff --git a/nailgun/nailgun/objects/transaction.py b/nailgun/nailgun/objects/transaction.py index 9fd9ac5af4..0905b2810a 100644 --- a/nailgun/nailgun/objects/transaction.py +++ b/nailgun/nailgun/objects/transaction.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +from datetime import datetime + from nailgun import consts from nailgun.db import db from nailgun.db.sqlalchemy import models @@ -94,6 +96,26 @@ class Transaction(NailgunObject): if instance is not None: return instance.tasks_snapshot + @classmethod + def on_start(cls, instance): + cls.update(instance, { + 'time_start': datetime.utcnow(), + 'status': consts.TASK_STATUSES.running + }) + + @classmethod + def on_finish(cls, instance, status, message=None): + data = { + 'progress': 100, + 'status': status, + 'time_end': datetime.utcnow(), + } + if message is not None: + data['message'] = message + + # set time start the same time of there is no time start + cls.update(instance, data) + class TransactionCollection(NailgunCollection): diff --git a/nailgun/nailgun/test/integration/test_task_managers.py b/nailgun/nailgun/test/integration/test_task_managers.py index d7d9ad8cf5..637e3a3096 100644 --- a/nailgun/nailgun/test/integration/test_task_managers.py +++ b/nailgun/nailgun/test/integration/test_task_managers.py @@ -56,6 +56,11 @@ class TestTaskManagers(BaseIntegrationTest): self.assertEqual(task_.cluster_id, None) self.assertNotEqual(task_.deleted_at, None) + def _check_timing(self, task): + self.assertIsNotNone(task.time_start) + self.assertIsNotNone(task.time_end) + self.assertLessEqual(task.time_start, task.time_end) + def set_history_ready(self): objects.DeploymentHistoryCollection.all().update( {'status': consts.HISTORY_TASK_STATUSES.ready}) @@ -77,6 +82,7 @@ class TestTaskManagers(BaseIntegrationTest): self.env.refresh_nodes() self.assertEqual(supertask.name, TASK_NAMES.deploy) self.assertEqual(supertask.status, consts.TASK_STATUSES.ready) + self._check_timing(supertask) # we have three subtasks here # deletion # provision @@ -87,11 +93,13 @@ class TestTaskManagers(BaseIntegrationTest): t for t in supertask.subtasks if t.name == consts.TASK_NAMES.provision ) + self._check_timing(provision_task) self.assertEqual(provision_task.weight, 0.4) deployment_task = next( t for t in supertask.subtasks if t.name == consts.TASK_NAMES.deployment ) + self._check_timing(deployment_task) self.assertEqual( consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE, deployment_task.graph_type ) diff --git a/nailgun/nailgun/test/integration/test_transactions_manager.py b/nailgun/nailgun/test/integration/test_transactions_manager.py index bf9ffb637d..254dc1c5ac 100644 --- a/nailgun/nailgun/test/integration/test_transactions_manager.py +++ b/nailgun/nailgun/test/integration/test_transactions_manager.py @@ -81,6 +81,11 @@ class TestTransactionManager(base.BaseIntegrationTest): progress=100, status=consts.TASK_STATUSES.error) + def _check_timing(self, task): + self.assertIsNotNone(task.time_start) + self.assertIsNotNone(task.time_end) + self.assertLessEqual(task.time_start, task.time_end) + @mock.patch('nailgun.transactions.manager.rpc') def test_execute_graph(self, rpc_mock): task = self.manager.execute(graphs=[{"type": "test_graph"}]) @@ -114,6 +119,7 @@ class TestTransactionManager(base.BaseIntegrationTest): self._success(task.subtasks[0].uuid) self.assertEqual(task.status, consts.TASK_STATUSES.ready) + self._check_timing(task) self.assertEqual( consts.CLUSTER_STATUSES.operational, self.cluster.status ) @@ -206,7 +212,7 @@ class TestTransactionManager(base.BaseIntegrationTest): # Consider we've got success from Astute. self._success(task.subtasks[1].uuid) - + self._check_timing(task.subtasks[1]) # Ensure the top leve transaction is ready. self.assertEqual(task.status, consts.TASK_STATUSES.ready) @@ -270,6 +276,8 @@ class TestTransactionManager(base.BaseIntegrationTest): self.assertEqual(rpc_mock.cast.call_count, 1) self.assertEqual(task.status, consts.TASK_STATUSES.error) + self._check_timing(task.subtasks[0]) + self._check_timing(task.subtasks[1]) self.assertEqual( consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status ) diff --git a/nailgun/nailgun/transactions/manager.py b/nailgun/nailgun/transactions/manager.py index 047468cdce..69e04d018c 100644 --- a/nailgun/nailgun/transactions/manager.py +++ b/nailgun/nailgun/transactions/manager.py @@ -170,6 +170,7 @@ class TransactionsManager(object): 'status': consts.TASK_STATUSES.running, 'dry_run': dry_run or noop_run, }) + objects.Transaction.on_start(transaction) helpers.TaskHelper.create_action_log(transaction) for graph in graphs: @@ -185,7 +186,7 @@ class TransactionsManager(object): cache['dry_run'] = dry_run cache['debug'] = debug - sub_transaction = transaction.create_subtask( + transaction.create_subtask( self.task_name, status=consts.TASK_STATUSES.pending, dry_run=dry_run or noop_run, @@ -196,7 +197,6 @@ class TransactionsManager(object): # FIXME: Consider to use a separate set of columns. cache=cache, ) - helpers.TaskHelper.create_action_log(sub_transaction) # We need to commit transaction because asynchronous call below might # be executed in separate process or thread. @@ -267,6 +267,7 @@ class TransactionsManager(object): _update_transaction(transaction, status, progress, error) if status in (consts.TASK_STATUSES.error, consts.TASK_STATUSES.ready): + objects.Transaction.on_finish(transaction, status) helpers.TaskHelper.update_action_log(transaction) if transaction.parent: # if transaction is completed successfully, @@ -277,10 +278,8 @@ class TransactionsManager(object): self.fail(transaction.parent, error) def success(self, transaction): - objects.Transaction.update( - transaction, - {'status': consts.TASK_STATUSES.ready, 'progress': 100} - ) + objects.Transaction.on_finish(transaction, consts.TASK_STATUSES.ready) + helpers.TaskHelper.update_action_log(transaction) _update_cluster_status(transaction) notifier.notify( consts.NOTIFICATION_TOPICS.done, @@ -292,19 +291,18 @@ class TransactionsManager(object): ) def fail(self, transaction, reason): - data = { - 'status': consts.TASK_STATUSES.error, - 'message': reason, - 'progress': 100 - } - objects.Transaction.update(transaction, data) + objects.Transaction.on_finish( + transaction, consts.TASK_STATUSES.error, message=reason + ) helpers.TaskHelper.update_action_log(transaction) - - data['message'] = 'Aborted' for sub_transaction in transaction.subtasks: if sub_transaction.status == consts.TASK_STATUSES.pending: - objects.Transaction.update(sub_transaction, data) - helpers.TaskHelper.update_action_log(sub_transaction) + # on_start and on_finish called to properly handle + # status transition + objects.Transaction.on_start(sub_transaction) + objects.Transaction.on_finish( + sub_transaction, consts.TASK_STATUSES.error, "Aborted" + ) _update_cluster_status(transaction) notifier.notify( @@ -367,6 +365,8 @@ class TransactionsManager(object): message = make_astute_message( sub_transaction, context, graph, resolver ) + objects.Transaction.on_start(sub_transaction) + helpers.TaskHelper.create_action_log(sub_transaction) # Once rpc.cast() is called, the message is sent to Astute. By # that moment all transaction instanced must exist in database,