diff --git a/nailgun/nailgun/objects/task.py b/nailgun/nailgun/objects/task.py index 675dc11e91..ef3a0d522d 100644 --- a/nailgun/nailgun/objects/task.py +++ b/nailgun/nailgun/objects/task.py @@ -289,60 +289,6 @@ class Task(NailgunObject): result.pop('status', None) return result - @classmethod - def update_recursively(cls, instance, data): - logger.debug("Updating task: %s", instance.uuid) - clean_data = cls._clean_data(data) - super(Task, cls).update(instance, data) - if instance.parent: - parent = instance.parent - siblings = parent.subtasks - status = clean_data.get('status') - if status == consts.TASK_STATUSES.ready: - clean_data['progress'] = 100 - instance.progress = 100 - ready_siblings_count = sum( - x.status == consts.TASK_STATUSES.ready for x in siblings - ) - if ready_siblings_count == len(siblings): - parent.status = consts.TASK_STATUSES.ready - elif status == consts.TASK_STATUSES.error: - parent.status = consts.TASK_STATUSES.error - for s in siblings: - if s.status != consts.TASK_STATUSES.ready: - s.status = consts.TASK_STATUSES.error - s.progress = 100 - s.message = "Task aborted" - clean_data['progress'] = 100 - instance.progress = 100 - TaskHelper.update_action_log(parent) - elif status == consts.TASK_STATUSES.running: - parent.status = consts.TASK_STATUSES.running - - if 'progress' in clean_data: - total_progress = sum(x.progress for x in siblings) - parent.progress = total_progress // len(siblings) - - task_status = parent.status - else: - task_status = instance.status - - if not instance.dry_run: - if task_status == consts.TASK_STATUSES.ready: - cls._update_cluster_status( - instance.cluster, - consts.CLUSTER_STATUSES.operational, - consts.NODE_STATUSES.ready - ) - elif task_status == consts.TASK_STATUSES.error: - cls._update_cluster_status( - instance.cluster, - consts.CLUSTER_STATUSES.error, - None - ) - - db().flush() - @classmethod def update(cls, instance, data): logger.debug("Updating task: %s", instance.uuid) diff --git a/nailgun/nailgun/rpc/receiver.py b/nailgun/nailgun/rpc/receiver.py index dea8418d81..c88297030a 100644 --- a/nailgun/nailgun/rpc/receiver.py +++ b/nailgun/nailgun/rpc/receiver.py @@ -241,7 +241,6 @@ class NailgunReceiver(object): fail_if_not_found=True, lock_for_update=True, ) - manager = transactions.TransactionsManager(transaction.cluster.id) manager.process(transaction, kwargs) @@ -495,7 +494,13 @@ class NailgunReceiver(object): elif status == consts.TASK_STATUSES.ready: data = cls._success_action(task, status, progress, nodes) else: - data = {'status': status, 'progress': progress, 'message': message} + data = {} + if status: + data['status'] = status + if progress: + data['progress'] = progress + if message: + data['message'] = message return data @classmethod diff --git a/nailgun/nailgun/test/integration/test_transactions_manager.py b/nailgun/nailgun/test/integration/test_transactions_manager.py index 9bbcdfc798..0096a3f7ca 100644 --- a/nailgun/nailgun/test/integration/test_transactions_manager.py +++ b/nailgun/nailgun/test/integration/test_transactions_manager.py @@ -112,6 +112,9 @@ class TestTransactionManager(base.BaseIntegrationTest): self._success(task.subtasks[0].uuid) self.assertEqual(task.status, consts.TASK_STATUSES.ready) + self.assertEqual( + consts.CLUSTER_STATUSES.operational, self.cluster.status + ) @mock.patch('nailgun.transactions.manager.rpc') def test_execute_few_graphs(self, rpc_mock): @@ -258,6 +261,9 @@ class TestTransactionManager(base.BaseIntegrationTest): self.assertEqual(rpc_mock.cast.call_count, 1) self.assertEqual(task.status, consts.TASK_STATUSES.error) + self.assertEqual( + consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status + ) @mock.patch('nailgun.transactions.manager.rpc') def test_execute_w_task(self, rpc_mock): @@ -347,6 +353,10 @@ class TestTransactionManager(base.BaseIntegrationTest): @mock.patch('nailgun.transactions.manager.rpc') def test_execute_dry_run(self, rpc_mock): + node = self.cluster.nodes[0] + node.pending_roles = ['compute'] + self.cluster.status = consts.CLUSTER_STATUSES.new + task = self.manager.execute( graphs=[{"type": "test_graph"}], dry_run=True) @@ -377,6 +387,31 @@ class TestTransactionManager(base.BaseIntegrationTest): self._success(task.subtasks[0].uuid) self.assertEqual(task.status, consts.TASK_STATUSES.ready) + self.assertEqual(['compute'], node.pending_roles) + self.assertEqual(consts.CLUSTER_STATUSES.new, self.cluster.status) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_graph_fails_on_some_nodes(self, rpc_mock): + task = self.manager.execute(graphs=[{"type": "test_graph"}]) + self.assertNotEqual(consts.TASK_STATUSES.error, task.status) + self.assertEqual(1, rpc_mock.cast.call_count) + + self.receiver.transaction_resp( + task_uuid=task.uuid, + nodes=[ + {'uid': n.uid, 'status': consts.NODE_STATUSES.error} + for n in self.cluster.nodes[:1] + ] + [ + {'uid': n.uid, 'status': consts.NODE_STATUSES.ready} + for n in self.cluster.nodes[1:] + ], + progress=100, + status=consts.TASK_STATUSES.ready) + self._success(task.subtasks[0].uuid) + self.assertEqual(task.status, consts.TASK_STATUSES.ready) + self.assertEqual( + consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status + ) @mock.patch('nailgun.transactions.manager.rpc') def test_execute_on_one_node(self, rpc_mock): diff --git a/nailgun/nailgun/test/unit/test_objects.py b/nailgun/nailgun/test/unit/test_objects.py index c44914e0a9..46556c3820 100644 --- a/nailgun/nailgun/test/unit/test_objects.py +++ b/nailgun/nailgun/test/unit/test_objects.py @@ -933,159 +933,6 @@ class TestTaskObject(BaseIntegrationTest): self.assertEquals(consts.TASK_STATUSES.ready, task_obj.status) -class TestTransactionObject(BaseIntegrationTest): - def setUp(self): - super(TestTransactionObject, self).setUp() - self.cluster = self.env.create( - nodes_kwargs=[ - {'roles': ['controller']}, - {'roles': ['compute']}, - {'roles': ['cinder']}]) - - def test_get_last_success_run(self): - objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.pending - }) - objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.error - }) - transaction = objects.TransactionCollection.get_last_succeed_run( - self.cluster - ) - self.assertIsNone(transaction) - objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.ready - }) - finished2 = objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.ready - }) - transaction = objects.TransactionCollection.get_last_succeed_run( - self.cluster - ) - self.assertEqual(finished2.id, transaction.id) - - def test_get_deployment_info(self): - transaction = objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.ready - }) - self.assertEquals( - objects.Transaction.get_deployment_info(transaction), - {} - ) - info = {'test': {'test': 'test'}} - objects.Transaction.attach_deployment_info(transaction, info) - self.assertEqual( - info, objects.Transaction.get_deployment_info(transaction) - ) - self.assertEqual(objects.Transaction.get_deployment_info(None), {}) - - def test_get_cluster_settings(self): - transaction = objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.ready - }) - self.assertIsNone( - objects.Transaction.get_cluster_settings(transaction) - ) - info = {'test': 'test'} - objects.Transaction.attach_cluster_settings(transaction, info) - self.assertEqual( - info, objects.Transaction.get_cluster_settings(transaction) - ) - self.assertIsNone(objects.Transaction.get_cluster_settings(None)) - - def test_get_network_settings(self): - transaction = objects.Transaction.create({ - 'cluster_id': self.cluster.id, - 'name': consts.TASK_NAMES.deployment, - 'status': consts.TASK_STATUSES.ready - }) - self.assertIsNone( - objects.Transaction.get_network_settings(transaction) - ) - info = {'test': 'test'} - objects.Transaction.attach_network_settings(transaction, info) - self.assertEqual( - info, objects.Transaction.get_network_settings(transaction) - ) - self.assertIsNone(objects.Transaction.get_network_settings(None)) - - def test_get_successful_transactions_per_task(self): - history_collection = objects.DeploymentHistoryCollection - get_succeed = ( - objects.TransactionCollection.get_successful_transactions_per_task - ) - uid1 = '1' - uid2 = '2' - - tasks_graph = { - None: [ - {'id': 'post_deployment_start'}, - {'id': 'post_deployment_end'} - ], - uid1: [{'id': 'dns-client'}] - } - - def make_task_with_history(task_status, graph): - task = self.env.create_task( - name=consts.TASK_NAMES.deployment, - status=task_status, - cluster_id=self.cluster.id) - - history_collection.create(task, graph) - - history_collection.all().update( - {'status': consts.HISTORY_TASK_STATUSES.ready}) - return task - - # create some tasks in history - task1 = make_task_with_history('ready', tasks_graph) - transactions = get_succeed(self.cluster.id, ['dns-client']).all() - self.assertEqual(transactions, [(task1, uid1, 'dns-client')]) - - # remove 'dns-client' and add 'test' to graph for two nodes - tasks_graph[uid1] = tasks_graph[uid2] = [{'id': 'test'}] - task2 = make_task_with_history('ready', tasks_graph) - transactions = get_succeed(self.cluster.id, ['test']).all() - self.assertEqual(transactions, [(task2, uid1, 'test'), - (task2, uid2, 'test')]) - - # remove 'test' and add 'dns-client' to graph, leave node2 as previous - tasks_graph[uid1] = [{'id': 'dns-client'}] - task3 = make_task_with_history('ready', tasks_graph) - transactions = get_succeed(self.cluster.id, - ['dns-client', 'test']).all() - - # now we should find both `test` and `dns-client` transactions - # on node 1 and onle `test` on node 2 - self.assertEqual( - transactions, - [(task3, uid1, 'dns-client'), - (task2, uid1, 'test'), - (task3, uid2, 'test')] - ) - - # filter out node 2 - transactions = get_succeed(self.cluster.id, - ['dns-client', 'test'], [uid1]).all() - self.assertEqual( - transactions, - [(task3, uid1, 'dns-client'), - (task2, uid1, 'test')] - ) - - class TestActionLogObject(BaseIntegrationTest): def _create_log_entry(self, object_data): @@ -2198,126 +2045,6 @@ class TestOpenstackConfigCollection(BaseTestCase): self.assertEqual(configs[0].config_type, consts.OPENSTACK_CONFIG_TYPES.node) - def test_task_update_recursively(self): - parent = self.env.create_task( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.pending, - cluster_id=self.cluster.id - ) - child1 = parent.create_subtask( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.pending - ) - child2 = parent.create_subtask( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.pending - ) - # update progress for child1 - objects.Task.update_recursively( - child1, {'status': consts.TASK_STATUSES.running, 'progress': 50} - ) - self.assertEqual(50, child1.progress) - self.assertEqual(consts.TASK_STATUSES.running, child1.status) - self.assertEqual(25, parent.progress) - self.assertEqual(consts.TASK_STATUSES.running, parent.status) - # finish child 1 - objects.Task.update_recursively( - child1, {'status': consts.TASK_STATUSES.ready} - ) - self.assertEqual(100, child1.progress) - self.assertEqual(consts.TASK_STATUSES.ready, child1.status) - self.assertEqual(50, parent.progress) - self.assertEqual(consts.TASK_STATUSES.running, parent.status) - # finish child 2 - objects.Task.update_recursively( - child2, {'status': consts.TASK_STATUSES.ready} - ) - self.assertEqual(100, parent.progress) - self.assertEqual(consts.TASK_STATUSES.ready, parent.status) - # fail child 2 when child1 is ready - objects.Task.update_recursively( - child2, {'status': consts.TASK_STATUSES.error} - ) - self.assertEqual(100, parent.progress) - self.assertEqual(consts.TASK_STATUSES.error, parent.status) - self.assertEqual(consts.TASK_STATUSES.ready, child1.status) - child1.status = consts.TASK_STATUSES.running - objects.Task.update_recursively( - child2, {'status': consts.TASK_STATUSES.error} - ) - self.assertEqual(100, parent.progress) - self.assertEqual(consts.TASK_STATUSES.error, parent.status) - self.assertEqual(consts.TASK_STATUSES.error, child1.status) - - def test_update_cluster_status_on_updating_task_status(self): - with mock.patch.object(objects.Task, '_update_cluster_status') as m: - task = self.env.create_task( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.running, - cluster_id=self.cluster.id, - dry_run=True - ) - child1 = task.create_subtask( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.pending, - dry_run=True - ) - child2 = task.create_subtask( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.pending, - dry_run=True - ) - objects.Task.update_recursively( - task, {'status': consts.TASK_STATUSES.error} - ) - objects.Task.update_recursively( - task, {'status': consts.TASK_STATUSES.ready} - ) - - task.dry_run = False - child1.dry_run = False - child2.dry_run = False - task.status = consts.TASK_STATUSES.running - objects.Task.update_recursively( - child1, {'status': consts.TASK_STATUSES.ready} - ) - self.assertEqual(0, m.call_count) - - objects.Task.update_recursively( - child2, {'status': consts.TASK_STATUSES.ready} - ) - m.assert_called_with( - task.cluster, - consts.CLUSTER_STATUSES.operational, - consts.NODE_STATUSES.ready - ) - objects.Task.update_recursively( - child2, {'status': consts.TASK_STATUSES.error} - ) - - m.assert_called_with( - task.cluster, - consts.CLUSTER_STATUSES.error, - None - ) - - objects.Task.update_recursively( - task, {'status': consts.TASK_STATUSES.ready} - ) - m.assert_called_with( - task.cluster, - consts.CLUSTER_STATUSES.operational, - consts.NODE_STATUSES.ready - ) - objects.Task.update_recursively( - task, {'status': consts.TASK_STATUSES.error} - ) - m.assert_called_with( - task.cluster, - consts.CLUSTER_STATUSES.error, - None - ) - class TestNodeStatus(BaseTestCase): def setUp(self): diff --git a/nailgun/nailgun/test/unit/test_transaction_object.py b/nailgun/nailgun/test/unit/test_transaction_object.py new file mode 100644 index 0000000000..2472678ac1 --- /dev/null +++ b/nailgun/nailgun/test/unit/test_transaction_object.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- + +# Copyright 2014 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nailgun.test.base import BaseTestCase + +from nailgun import consts +from nailgun import objects + + +class TestTransactionObject(BaseTestCase): + def setUp(self): + super(TestTransactionObject, self).setUp() + self.cluster = self.env.create( + nodes_kwargs=[ + {'roles': ['controller']}, + {'roles': ['compute']}, + {'roles': ['cinder']}]) + + def test_get_last_success_run(self): + objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.pending + }) + objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.error + }) + transaction = objects.TransactionCollection.get_last_succeed_run( + self.cluster + ) + self.assertIsNone(transaction) + objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.ready + }) + finished2 = objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.ready + }) + transaction = objects.TransactionCollection.get_last_succeed_run( + self.cluster + ) + self.assertEqual(finished2.id, transaction.id) + + def test_get_deployment_info(self): + transaction = objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.ready + }) + self.assertEquals( + objects.Transaction.get_deployment_info(transaction), + {} + ) + info = {'test': {'test': 'test'}} + objects.Transaction.attach_deployment_info(transaction, info) + self.assertEqual( + info, objects.Transaction.get_deployment_info(transaction) + ) + self.assertEqual(objects.Transaction.get_deployment_info(None), {}) + + def test_get_cluster_settings(self): + transaction = objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.ready + }) + self.assertIsNone( + objects.Transaction.get_cluster_settings(transaction) + ) + info = {'test': 'test'} + objects.Transaction.attach_cluster_settings(transaction, info) + self.assertEqual( + info, objects.Transaction.get_cluster_settings(transaction) + ) + self.assertIsNone(objects.Transaction.get_cluster_settings(None)) + + def test_get_network_settings(self): + transaction = objects.Transaction.create({ + 'cluster_id': self.cluster.id, + 'name': consts.TASK_NAMES.deployment, + 'status': consts.TASK_STATUSES.ready + }) + self.assertIsNone( + objects.Transaction.get_network_settings(transaction) + ) + info = {'test': 'test'} + objects.Transaction.attach_network_settings(transaction, info) + self.assertEqual( + info, objects.Transaction.get_network_settings(transaction) + ) + self.assertIsNone(objects.Transaction.get_network_settings(None)) + + def test_get_successful_transactions_per_task(self): + history_collection = objects.DeploymentHistoryCollection + get_succeed = ( + objects.TransactionCollection.get_successful_transactions_per_task + ) + uid1 = '1' + uid2 = '2' + + tasks_graph = { + None: [ + {'id': 'post_deployment_start'}, + {'id': 'post_deployment_end'} + ], + uid1: [{'id': 'dns-client'}] + } + + def make_task_with_history(task_status, graph): + task = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=task_status, + cluster_id=self.cluster.id) + + history_collection.create(task, graph) + + history_collection.all().update( + {'status': consts.HISTORY_TASK_STATUSES.ready}) + return task + + # create some tasks in history + task1 = make_task_with_history('ready', tasks_graph) + transactions = get_succeed(self.cluster.id, ['dns-client']).all() + self.assertEqual(transactions, [(task1, uid1, 'dns-client')]) + + # remove 'dns-client' and add 'test' to graph for two nodes + tasks_graph[uid1] = tasks_graph[uid2] = [{'id': 'test'}] + task2 = make_task_with_history('ready', tasks_graph) + transactions = get_succeed(self.cluster.id, ['test']).all() + self.assertEqual(transactions, [(task2, uid1, 'test'), + (task2, uid2, 'test')]) + + # remove 'test' and add 'dns-client' to graph, leave node2 as previous + tasks_graph[uid1] = [{'id': 'dns-client'}] + task3 = make_task_with_history('ready', tasks_graph) + transactions = get_succeed(self.cluster.id, + ['dns-client', 'test']).all() + + # now we should find both `test` and `dns-client` transactions + # on node 1 and onle `test` on node 2 + self.assertEqual( + transactions, + [(task3, uid1, 'dns-client'), + (task2, uid1, 'test'), + (task3, uid2, 'test')] + ) + + # filter out node 2 + transactions = get_succeed(self.cluster.id, + ['dns-client', 'test'], [uid1]).all() + self.assertEqual( + transactions, + [(task3, uid1, 'dns-client'), + (task2, uid1, 'test')] + ) diff --git a/nailgun/nailgun/transactions/manager.py b/nailgun/nailgun/transactions/manager.py index ac34028b7c..30840b049c 100644 --- a/nailgun/nailgun/transactions/manager.py +++ b/nailgun/nailgun/transactions/manager.py @@ -86,16 +86,14 @@ class try_transaction(object): * create an action log record on start/finish; :param transaction: a transaction instance to be wrapped - :param suppress: do not propagate exception if True """ - def __init__(self, transaction, suppress=False): + def __init__(self, transaction, on_error): self._transaction = transaction - self._suppress = suppress + self._on_error = on_error def __enter__(self): logger.debug("Transaction %s starts assembling.", self._transaction.id) - self._logitem = helpers.TaskHelper.create_action_log(self._transaction) return self._transaction def __exit__(self, exc_type, exc_val, exc_tb): @@ -104,19 +102,12 @@ class try_transaction(object): "Transaction %s failed.", self._transaction.id, exc_info=(exc_type, exc_val, exc_tb) ) - objects.Task.update(self._transaction, { - 'status': consts.TASK_STATUSES.error, - 'progress': 100, - 'message': six.text_type(exc_val), - }) - helpers.TaskHelper.update_action_log( - self._transaction, self._logitem - ) + return self._on_error(self._transaction, six.text_type(exc_val)) else: logger.debug( "Transaction %s finish assembling.", self._transaction.id ) - return self._suppress + return False class TransactionsManager(object): @@ -173,6 +164,7 @@ class TransactionsManager(object): 'status': consts.TASK_STATUSES.pending, 'dry_run': dry_run, }) + helpers.TaskHelper.create_action_log(transaction) for graph in graphs: # 'dry_run' flag is a part of transaction, so we can restore its @@ -184,7 +176,7 @@ class TransactionsManager(object): cache = graph.copy() cache['force'] = force - transaction.create_subtask( + sub_transaction = transaction.create_subtask( self.task_name, status=consts.TASK_STATUSES.pending, dry_run=dry_run, @@ -195,6 +187,7 @@ class TransactionsManager(object): # FIXME: Consider to use a separate set of columns. cache=cache, ) + helpers.TaskHelper.create_action_log(sub_transaction) # We need to commit transaction because asynchronous call below might # be executed in separate process or thread. @@ -211,18 +204,30 @@ class TransactionsManager(object): transaction and send it to execution. :param transaction: a top-level transaction to continue + :return: True if sub transaction will be started, otherwise False """ - with try_transaction(transaction, suppress=True): + sub_transaction = next(( + sub_transaction + for sub_transaction in transaction.subtasks + if sub_transaction.status == consts.TASK_STATUSES.pending), None) + + if sub_transaction is None: + # there is no sub-transaction, so we can close this transaction + self.success(transaction) + return False + + with try_transaction(transaction, self.fail): # uWSGI mule is a separate process, and that means it won't share # our DB session. Hence, we can't pass fetched DB instances to the # function we want to be executed in mule, so let's proceed with # unique identifiers. mule.call_task_manager_async( self.__class__, - '_continue_async', + '_execute_async', self.cluster_id, - transaction.id, + sub_transaction.id, ) + return True def process(self, transaction, report): """Process feedback from executor (Astute). @@ -250,42 +255,71 @@ class TransactionsManager(object): _update_nodes(transaction, nodes_instances, nodes_params) _update_history(transaction, nodes) + _update_transaction(transaction, status, progress, error) - if status: - # FIXME: resolve circular dependencies by moving assemble task - # updates from receiver to objects layer. - from nailgun.rpc.receiver import NailgunReceiver - objects.Task.update_recursively( - transaction, - NailgunReceiver._assemble_task_update( - transaction, status, progress, error, nodes_instances - ) - ) + if status in (consts.TASK_STATUSES.error, consts.TASK_STATUSES.ready): + helpers.TaskHelper.update_action_log(transaction) + if transaction.parent: + # if transaction is completed successfully, + # we've got to initiate the next one in the chain + if status == consts.TASK_STATUSES.ready: + self.continue_(transaction.parent) + else: + self.fail(transaction.parent, error) - # if transaction is completed successfully, we've got to initiate - # the next one in the chain - if transaction.parent and status == consts.TASK_STATUSES.ready: - self.continue_(transaction.parent) + def success(self, transaction): + objects.Transaction.update( + transaction, + {'status': consts.TASK_STATUSES.ready, 'progress': 100} + ) + _update_cluster_status(transaction) + notifier.notify( + consts.NOTIFICATION_TOPICS.done, + "Graph execution has been successfully completed." + "You can check deployment history for detailed information.", + transaction.cluster_id, + None, + task_uuid=transaction.uuid + ) - def _continue_async(self, transaction_id): - transaction = objects.Transaction.get_by_uid(transaction_id) + def fail(self, transaction, reason): + data = { + 'status': consts.TASK_STATUSES.error, + 'message': reason, + 'progress': 100 + } + objects.Transaction.update(transaction, data) + helpers.TaskHelper.update_action_log(transaction) - with try_transaction(transaction, suppress=True): - self._continue_sync(transaction) + data['message'] = 'Aborted' + for sub_transaction in transaction.subtasks: + if sub_transaction.status == consts.TASK_STATUSES.pending: + objects.Transaction.update(sub_transaction, data) + helpers.TaskHelper.update_action_log(sub_transaction) + + _update_cluster_status(transaction) + notifier.notify( + consts.NOTIFICATION_TOPICS.error, + "Graph execution failed with error: '{0}'." + "Please check deployment history for more details." + .format(reason), + transaction.cluster_id, + None, + task_uuid=transaction.uuid + ) + return True + + def _execute_async(self, sub_transaction_id): + sub_transaction = objects.Transaction.get_by_uid(sub_transaction_id) + + with try_transaction(sub_transaction.parent, self.fail): + self._execute_sync(sub_transaction) # Since the whole function is executed in separate process, we must # commit all changes in order to do not lost them. db().commit() - def _continue_sync(self, transaction): - sub_transaction = next(( - sub_transaction - for sub_transaction in transaction.subtasks - if sub_transaction.status == consts.TASK_STATUSES.pending), None) - - if sub_transaction is None: - return False - + def _execute_sync(self, sub_transaction): cluster = sub_transaction.cluster graph = objects.Cluster.get_deployment_graph( cluster, sub_transaction.graph_type @@ -296,10 +330,11 @@ class TransactionsManager(object): sub_transaction.cache.get('nodes') ) for node in nodes: - node.roles = list(set(node.roles + node.pending_roles)) - node.pending_roles = [] - node.error_type = None + # set progress to show that node is in progress state node.progress = 1 + if not sub_transaction.dry_run: + node.error_type = None + node.error_msg = None resolver = role_resolver.RoleResolver(nodes) _adjust_graph_tasks( @@ -320,16 +355,16 @@ class TransactionsManager(object): # top of this. _dump_expected_state(sub_transaction, context.new, graph['tasks']) - with try_transaction(sub_transaction): - message = make_astute_message( - sub_transaction, context, graph, resolver) + message = make_astute_message( + sub_transaction, context, graph, resolver + ) - # Once rpc.cast() is called, the message is sent to Astute. By - # that moment all transaction instanced must exist in database, - # otherwise we may get wrong result due to RPC receiver won't - # found entry to update. - db().commit() - rpc.cast('naily', [message]) + # Once rpc.cast() is called, the message is sent to Astute. By + # that moment all transaction instanced must exist in database, + # otherwise we may get wrong result due to RPC receiver won't + # found entry to update. + db().commit() + rpc.cast('naily', [message]) def _acquire_cluster(self): cluster = objects.Cluster.get_by_uid( @@ -558,6 +593,12 @@ def _update_nodes(transaction, nodes_instances, nodes_params): node_id=node.uid, task_uuid=transaction.uuid ) + elif new_status == 'ready': + # TODO(bgaifullin) need to remove pengind roles concept + node.roles = list(set(node.roles + node.pending_roles)) + node.pending_roles = [] + node.progress = 100 + node.status = new_status else: node.status = new_status else: @@ -581,3 +622,44 @@ def _update_history(transaction, nodes): node['task_status'], node.get('custom'), ) + + +def _update_transaction(transaction, status, progress, message): + data = {} + if status: + data['status'] = status + if progress: + data['progress'] = progress + if message: + data['message'] = message + if data: + objects.Transaction.update(transaction, data) + + if transaction.parent and progress: + siblings = transaction.parent.subtasks + total_progress = sum(x.progress for x in siblings) + objects.Transaction.update(transaction.parent, { + 'progress': total_progress // len(siblings) + }) + + +def _update_cluster_status(transaction): + if transaction.dry_run: + return + + nodes = objects.NodeCollection.filter_by( + None, cluster_id=transaction.cluster_id + ) + failed_nodes = objects.NodeCollection.filter_by_not(nodes, error_type=None) + not_ready_nodes = objects.NodeCollection.filter_by_not( + nodes, status=consts.NODE_STATUSES.ready + ) + # if all nodes are ready - cluster has operational status + # otherwise cluster has partially deployed status + if (objects.NodeCollection.count(failed_nodes) or + objects.NodeCollection.count(not_ready_nodes)): + status = consts.CLUSTER_STATUSES.partially_deployed + else: + status = consts.CLUSTER_STATUSES.operational + + objects.Cluster.update(transaction.cluster, {'status': status})