diff --git a/nailgun/nailgun/api/v1/handlers/base.py b/nailgun/nailgun/api/v1/handlers/base.py index 33c933c9e3..bb4e8d64c1 100644 --- a/nailgun/nailgun/api/v1/handlers/base.py +++ b/nailgun/nailgun/api/v1/handlers/base.py @@ -549,76 +549,6 @@ class DBSingletonHandler(BaseHandler): return self.single.to_dict(instance) -# TODO(enchantner): rewrite more handlers to inherit from this -# and move more common code here -class DeferredTaskHandler(BaseHandler): - """Abstract Deferred Task Handler""" - - validator = BaseDefferedTaskValidator - single = objects.Task - log_message = u"Starting deferred task on environment '{env_id}'" - log_error = u"Error during execution of deferred task " \ - u"on environment '{env_id}': {error}" - task_manager = None - - @classmethod - def get_options(cls): - return {} - - @handle_errors - @validate - def PUT(self, cluster_id): - """:returns: JSONized Task object. - - :http: * 202 (task successfully executed) - * 400 (invalid object data specified) - * 404 (environment is not found) - * 409 (task with such parameters already exists) - """ - cluster = self.get_object_or_404( - objects.Cluster, - cluster_id, - log_404=( - u"warning", - u"Error: there is no cluster " - u"with id '{0}' in DB.".format(cluster_id) - ) - ) - - logger.info(self.log_message.format(env_id=cluster_id)) - try: - options = self.get_options() - except ValueError as e: - raise self.http(400, six.text_type(e)) - try: - self.validator.validate(cluster) - task_manager = self.task_manager(cluster_id=cluster.id) - task = task_manager.execute(**options) - except ( - errors.AlreadyExists, - errors.StopAlreadyRunning - ) as exc: - raise self.http(409, exc.message) - except ( - errors.DeploymentNotRunning, - errors.NoDeploymentTasks, - errors.WrongNodeStatus, - errors.UnavailableRelease, - errors.CannotBeStopped, - ) as exc: - raise self.http(400, exc.message) - except Exception as exc: - logger.error( - self.log_error.format( - env_id=cluster_id, - error=str(exc) - ) - ) - # let it be 500 - raise - self.raise_task(task) - - class OrchestratorDeploymentTasksHandler(SingleHandler): """Handler for deployment graph serialization.""" @@ -720,3 +650,97 @@ class TransactionExecutorHandler(BaseHandler): raise self.http(409, e.message) except errors.InvalidData as e: raise self.http(400, e.message) + + +# TODO(enchantner): rewrite more handlers to inherit from this +# and move more common code here, this is deprecated handler +class DeferredTaskHandler(TransactionExecutorHandler): + """Abstract Deferred Task Handler""" + + validator = BaseDefferedTaskValidator + single = objects.Task + log_message = u"Starting deferred task on environment '{env_id}'" + log_error = u"Error during execution of deferred task " \ + u"on environment '{env_id}': {error}" + task_manager = None + + @classmethod + def get_options(cls): + return {} + + @classmethod + def get_transaction_options(cls, cluster, options): + """Finds graph for this action.""" + return None + + @handle_errors + @validate + def PUT(self, cluster_id): + """:returns: JSONized Task object. + + :http: * 202 (task successfully executed) + * 400 (invalid object data specified) + * 404 (environment is not found) + * 409 (task with such parameters already exists) + """ + cluster = self.get_object_or_404( + objects.Cluster, + cluster_id, + log_404=( + u"warning", + u"Error: there is no cluster " + u"with id '{0}' in DB.".format(cluster_id) + ) + ) + + logger.info(self.log_message.format(env_id=cluster_id)) + try: + options = self.get_options() + except ValueError as e: + raise self.http(400, six.text_type(e)) + + try: + self.validator.validate(cluster) + except errors.NailgunException as e: + raise self.http(400, e.message) + + if objects.Release.is_lcm_supported(cluster.release): + # try to get new graph to run transaction manager + try: + transaction_options = self.get_transaction_options( + cluster, options + ) + except errors.NailgunException as e: + logger.exception("Failed to get transaction options") + raise self.http(400, msg=six.text_type(e)) + + if transaction_options: + return self.start_transaction(cluster, transaction_options) + + try: + + task_manager = self.task_manager(cluster_id=cluster.id) + task = task_manager.execute(**options) + except ( + errors.AlreadyExists, + errors.StopAlreadyRunning + ) as exc: + raise self.http(409, exc.message) + except ( + errors.DeploymentNotRunning, + errors.NoDeploymentTasks, + errors.WrongNodeStatus, + errors.UnavailableRelease, + errors.CannotBeStopped, + ) as exc: + raise self.http(400, exc.message) + except Exception as exc: + logger.error( + self.log_error.format( + env_id=cluster_id, + error=str(exc) + ) + ) + # let it be 500 + raise + self.raise_task(task) diff --git a/nailgun/nailgun/api/v1/handlers/cluster.py b/nailgun/nailgun/api/v1/handlers/cluster.py index d182e03c0e..34946a7fe7 100644 --- a/nailgun/nailgun/api/v1/handlers/cluster.py +++ b/nailgun/nailgun/api/v1/handlers/cluster.py @@ -130,6 +130,20 @@ class ClusterChangesHandler(DeferredTaskHandler): task_manager = ApplyChangesTaskManager validator = ClusterChangesValidator + @classmethod + def get_transaction_options(cls, cluster, options): + """Find sequence 'default' to use for deploy-changes handler.""" + sequence = objects.DeploymentSequence.get_by_name_for_release( + cluster.release, 'deploy-changes' + ) + if sequence: + return { + 'dry_run': options['dry_run'], + 'noop_run': options['noop_run'], + 'force': options['force'], + 'graphs': sequence.graphs, + } + @classmethod def get_options(cls): data = web.input(graph_type=None, dry_run="0", noop_run="0") @@ -142,13 +156,11 @@ class ClusterChangesHandler(DeferredTaskHandler): } -class ClusterChangesForceRedeployHandler(DeferredTaskHandler): +class ClusterChangesForceRedeployHandler(ClusterChangesHandler): log_message = u"Trying to force deployment of the environment '{env_id}'" log_error = u"Error during execution of a forced deployment task " \ u"on environment '{env_id}': {error}" - task_manager = ApplyChangesTaskManager - validator = ClusterChangesValidator @classmethod def get_options(cls): diff --git a/nailgun/nailgun/api/v1/handlers/orchestrator.py b/nailgun/nailgun/api/v1/handlers/orchestrator.py index 7b4174ec7a..f8d8b9fcfe 100644 --- a/nailgun/nailgun/api/v1/handlers/orchestrator.py +++ b/nailgun/nailgun/api/v1/handlers/orchestrator.py @@ -14,14 +14,13 @@ # License for the specific language governing permissions and limitations # under the License. -import traceback - import six import web from nailgun.api.v1.handlers.base import BaseHandler from nailgun.api.v1.handlers.base import handle_errors from nailgun.api.v1.handlers.base import serialize +from nailgun.api.v1.handlers.base import TransactionExecutorHandler from nailgun.api.v1.handlers.base import validate from nailgun.api.v1.validators.cluster import ProvisionSelectedNodesValidator from nailgun.api.v1.validators.node import DeploySelectedNodesValidator @@ -252,21 +251,66 @@ class RunMixin(object): return utils.parse_bool(web.input(noop_run='0').noop_run) -class SelectedNodesBase(NodesFilterMixin, BaseHandler): +class SelectedNodesBase(NodesFilterMixin, TransactionExecutorHandler): """Base class for running task manager on selected nodes.""" + graph_type = None + + def get_transaction_options(self, cluster, options): + if not objects.Release.is_lcm_supported(cluster.release): + # this code is actual only for lcm + return + + graph_type = options.get('graph_type') or self.graph_type + graph = graph_type and objects.Cluster.get_deployment_graph( + cluster, graph_type + ) + + if not graph or not graph['tasks']: + return + + nodes_ids = self.get_param_as_set('nodes', default=None) + if nodes_ids is not None: + nodes = self.get_objects_list_or_404( + objects.NodeCollection, nodes_ids + ) + nodes_ids = [n.id for n in nodes] + + if graph: + return { + 'noop_run': options.get('noop_run'), + 'dry_run': options.get('dry_run'), + 'force': options.get('force'), + 'graphs': [{ + 'type': graph['type'], + 'nodes': nodes_ids, + 'tasks': options.get('deployment_tasks') + }] + } + def handle_task(self, cluster, **kwargs): + if objects.Release.is_lcm_supported(cluster.release): + # this code is actual only if cluster is LCM ready + try: + transaction_options = self.get_transaction_options( + cluster, kwargs + ) + except errors.NailgunException as e: + logger.exception("Failed to get transaction options.") + raise self.http(400, six.text_type(e)) + + if transaction_options: + return self.start_transaction(cluster, transaction_options) nodes = self.get_nodes(cluster) - try: - task_manager = self.task_manager( - cluster_id=cluster.id) + task_manager = self.task_manager(cluster_id=cluster.id) task = task_manager.execute(nodes, **kwargs) except Exception as exc: - logger.warn( + logger.exception( u'Cannot execute %s task nodes: %s', - task_manager.__class__.__name__, traceback.format_exc()) + self.task_manager.__name__, ','.join(n.uid for n in nodes) + ) raise self.http(400, msg=six.text_type(exc)) self.raise_task(task) @@ -291,6 +335,8 @@ class ProvisionSelectedNodes(SelectedNodesBase): validator = ProvisionSelectedNodesValidator task_manager = manager.ProvisioningTaskManager + graph_type = 'provision' + def get_default_nodes(self, cluster): return TaskHelper.nodes_to_provision(cluster) @@ -318,6 +364,8 @@ class BaseDeploySelectedNodes(SelectedNodesBase): validator = DeploySelectedNodesValidator task_manager = manager.DeploymentTaskManager + graph_type = consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE + def get_default_nodes(self, cluster): return TaskHelper.nodes_to_deploy(cluster) diff --git a/nailgun/nailgun/task/fake.py b/nailgun/nailgun/task/fake.py index 87652c0039..2c0cd95000 100644 --- a/nailgun/nailgun/task/fake.py +++ b/nailgun/nailgun/task/fake.py @@ -33,6 +33,7 @@ from nailgun.db.sqlalchemy.models import Node from nailgun import objects from nailgun.rpc.receiver import NailgunReceiver from nailgun.settings import settings +from nailgun.utils import get_in def _is_slave(node): @@ -324,6 +325,26 @@ class FakeAmpqThread(FakeThread): class FakeDeploymentThread(FakeAmpqThread): + def inject_node_status_transition(self, kwargs): + if kwargs['status'] == consts.TASK_STATUSES.ready: + selector = 'successful' + elif kwargs['status'] == consts.TASK_STATUSES.error: + selector = 'failed' + elif kwargs['status'] == consts.TASK_STATUSES.stopped: + selector = 'stopped' + else: + return + + node_statuses_transition = get_in( + self.data['args'], + 'tasks_metadata', 'node_statuses_transitions', selector + ) + if node_statuses_transition: + kwargs['nodes'] = [ + dict(uid=uid, **node_statuses_transition) + for uid in self.data['args']['tasks_graph'] + ] + def message_gen(self): # TEST: we can fail only in deployment stage here: error = self.params.get("error") @@ -398,6 +419,7 @@ class FakeDeploymentThread(FakeAmpqThread): if error_msg: kwargs['error'] = error_msg + self.inject_node_status_transition(kwargs) yield kwargs diff --git a/nailgun/nailgun/test/integration/test_orchestrator_handlers.py b/nailgun/nailgun/test/integration/test_orchestrator_handlers.py index 49a3d7f3af..891e6ac16a 100644 --- a/nailgun/nailgun/test/integration/test_orchestrator_handlers.py +++ b/nailgun/nailgun/test/integration/test_orchestrator_handlers.py @@ -311,7 +311,7 @@ class TestSelectedNodesAction(BaseSelectedNodesTest): make_query(nodes=[n.uid for n in controller_nodes], dry_run='1') self.send_put(deploy_action_url) - self.assertTrue(mcast.call_args[0][1]['args']['dry_run']) + self.assertTrue(mcast.call_args[0][1][0]['args']['dry_run']) @mock_rpc(pass_mock=True) @patch("objects.Release.is_lcm_supported") @@ -330,7 +330,7 @@ class TestSelectedNodesAction(BaseSelectedNodesTest): make_query(nodes=[n.uid for n in controller_nodes], noop_run='1') self.send_put(deploy_action_url) - self.assertTrue(mcast.call_args[0][1]['args']['noop_run']) + self.assertTrue(mcast.call_args[0][1][0]['args']['noop_run']) @mock_rpc(pass_mock=True) @patch('nailgun.task.task.rpc.cast') diff --git a/nailgun/nailgun/test/integration/test_task_deploy.py b/nailgun/nailgun/test/integration/test_task_deploy.py index bf41125801..ac31388425 100644 --- a/nailgun/nailgun/test/integration/test_task_deploy.py +++ b/nailgun/nailgun/test/integration/test_task_deploy.py @@ -496,7 +496,7 @@ class TestTaskDeploy90AfterDeployment(BaseIntegrationTest): uuid=resp.json_body['uuid'], fail_if_not_found=True ).status) - graph = rpc_cast.call_args[0][1]['args']['tasks_graph'] + graph = rpc_cast.call_args[0][1][0]['args']['tasks_graph'] # LCM: no changes - no tasks self.assertItemsEqual( @@ -523,7 +523,7 @@ class TestTaskDeploy90AfterDeployment(BaseIntegrationTest): uuid=resp.json_body['uuid'], fail_if_not_found=True ).status) - graph = rpc_cast.call_args[0][1]['args']['tasks_graph'] + graph = rpc_cast.call_args[0][1][0]['args']['tasks_graph'] # due to 'force', task must be run anyway self.assertItemsEqual( diff --git a/nailgun/nailgun/test/integration/test_task_managers.py b/nailgun/nailgun/test/integration/test_task_managers.py index 18d7e02e0b..32e0a73029 100644 --- a/nailgun/nailgun/test/integration/test_task_managers.py +++ b/nailgun/nailgun/test/integration/test_task_managers.py @@ -63,7 +63,7 @@ class TestTaskManagers(BaseIntegrationTest): {'status': consts.HISTORY_TASK_STATUSES.ready}) def set_tasks_ready(self): - objects.TaskCollection.all().update( + objects.TransactionCollection.all().update( {'status': consts.TASK_STATUSES.ready}) @fake_tasks(override_state={"progress": 100, "status": "ready"}) @@ -1366,7 +1366,7 @@ class TestTaskManagers(BaseIntegrationTest): self.db.refresh(node3) self.assertEqual(consts.NODE_STATUSES.discover, node3.status) self.assertTrue(node3.pending_addition) - tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] + tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph'] self.assertItemsEqual( [consts.MASTER_NODE_UID, None] + nodes_uids, tasks_graph ) @@ -1405,7 +1405,7 @@ class TestTaskManagers(BaseIntegrationTest): self.assertNotEqual(consts.TASK_STATUSES.error, task.status) - tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] + tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph'] for task in tasks_graph['master']: if task['id'] in task_ids: self.assertEqual(task['type'], 'puppet') @@ -1431,7 +1431,7 @@ class TestTaskManagers(BaseIntegrationTest): headers=self.default_headers ) self.assertIn(resp.status_code, [200, 202]) - tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] + tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph'] # check that all nodes present in message self.assertItemsEqual( [n.uid for n in cluster.nodes] + [consts.MASTER_NODE_UID, None], @@ -1439,11 +1439,9 @@ class TestTaskManagers(BaseIntegrationTest): ) @mock.patch('nailgun.task.task.rpc.cast') - @mock.patch('nailgun.objects.Cluster.get_deployment_tasks') - @mock.patch('nailgun.objects.TransactionCollection' - '.get_successful_transactions_per_task') + @mock.patch('nailgun.objects.Cluster.get_deployment_graph') def check_correct_state_calculation(self, node_status, is_skip_expected, - state_mock, tasks_mock, rpc_mock): + get_graph_mock, rpc_mock): cluster = self.env.create( nodes_kwargs=[{'roles': ['controller'], 'status': consts.NODE_STATUSES.ready}], @@ -1456,28 +1454,33 @@ class TestTaskManagers(BaseIntegrationTest): task = { 'parameters': {}, 'type': 'puppet', - 'roles': ['primary-controller'], 'version': '2.1.0', + 'roles': ['/.*/'], 'version': '2.1.0', 'condition': {'yaql_exp': 'changed($.uid)'}, } - tasks_mock.return_value = [ - dict(task, id='test1'), dict(task, id='test2') - ] - state_mock.return_value = [] - + get_graph_mock.return_value = { + 'type': 'custom', + 'tasks': [dict(task, id='test1'), dict(task, id='test2')] + } + # reset progress + node.progress = 0 # deploy cluster at first time and create history supertask = self.env.launch_deployment_selected([node.uid], cluster.id) self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status) self.set_history_ready() self.set_tasks_ready() + # mark test2 as skipped to ensure that it will run in next deploy + objects.DeploymentHistoryCollection.filter_by( + None, deployment_graph_task_name='test2' + ).update({'status': consts.HISTORY_TASK_STATUSES.skipped}) node.status = node_status + node.progress = 0 - state_mock.return_value = [(supertask, node.uid, 'test1')] task = self.env.launch_deployment_selected([node.uid], cluster.id) self.assertNotEqual(consts.TASK_STATUSES.error, task.status) - tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] + tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph'] for task in tasks_graph[node.uid]: if task['id'] == 'test1': @@ -1491,7 +1494,7 @@ class TestTaskManagers(BaseIntegrationTest): self.assertNotEqual( task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped) else: - self.fail('Unexpected task in graph') + self.fail('Unexpected task in graph {0}'.format(task['id'])) def test_correct_state_calculation(self): self.check_correct_state_calculation(