diff --git a/nailgun/nailgun/db/sqlalchemy/models/task.py b/nailgun/nailgun/db/sqlalchemy/models/task.py index c5986c5ac0..2efeee4fb6 100644 --- a/nailgun/nailgun/db/sqlalchemy/models/task.py +++ b/nailgun/nailgun/db/sqlalchemy/models/task.py @@ -85,3 +85,7 @@ class Task(Base): self.subtasks.append(task) db().flush() return task + + def is_completed(self): + return self.status == consts.TASK_STATUSES.error or \ + self.status == consts.TASK_STATUSES.ready diff --git a/nailgun/nailgun/errors/__init__.py b/nailgun/nailgun/errors/__init__.py index 5f0a38ca35..60fe39ea1c 100644 --- a/nailgun/nailgun/errors/__init__.py +++ b/nailgun/nailgun/errors/__init__.py @@ -119,7 +119,8 @@ default_messages = { "UnknownError": "Unknown error", "UnresolvableConflict": "Unresolvable conflict", "NodeNotBelongToCluster": "The Node doesn't belong to the Cluster", - "TaskBaseDeploymentNotAllowed": "The task-based deployment is not allowed" + "TaskBaseDeploymentNotAllowed": "The task-based deployment is not allowed", + "NoChanges": "There is no changes to apply." } diff --git a/nailgun/nailgun/fixtures/deployment_tasks.yaml b/nailgun/nailgun/fixtures/deployment_tasks.yaml index 551cc33ee7..f42e7a801c 100644 --- a/nailgun/nailgun/fixtures/deployment_tasks.yaml +++ b/nailgun/nailgun/fixtures/deployment_tasks.yaml @@ -184,3 +184,13 @@ puppet_manifest: /etc/puppet/modules/osnailyfacter/modular/globals/globals.pp puppet_modules: /etc/puppet/modules timeout: 3600 + +- id: upload_configuration + type: upload_file + version: 2.0.0 + role: '*' + requires: [pre_deployment_start] + required_for: [pre_deployment_end] + parameters: + timeout: 180 + refresh_on: ['*'] diff --git a/nailgun/nailgun/task/manager.py b/nailgun/nailgun/task/manager.py index bb500b1cd4..4f3fbf1dde 100644 --- a/nailgun/nailgun/task/manager.py +++ b/nailgun/nailgun/task/manager.py @@ -62,28 +62,29 @@ class TaskManager(object): TaskHelper.update_action_log(task, al) return to_return + except errors.NoChanges as e: + self._finish_task(task, al, consts.TASK_STATUSES.ready, str(e)) except Exception as exc: - err = str(exc) if any([ not hasattr(exc, "log_traceback"), hasattr(exc, "log_traceback") and exc.log_traceback ]): logger.error(traceback.format_exc()) + self._finish_task(task, al, consts.TASK_STATUSES.error, str(exc)) - # update task entity with given data - data = {'status': 'error', - 'progress': 100, - 'message': err} - objects.Task.update(task, data) - # NOTE(romcheg): Flushing the data is required to unlock - # tasks in order to temporary fix issues with - # the deadlock detection query in tests and let the tests pass. - # TODO(akislitsky): Get rid of this flush as soon as - # task locking issues are resolved. - db().flush() - TaskHelper.update_action_log(task, al) + def _finish_task(self, task, log_item, status, message): + data = {'status': status, 'progress': 100, 'message': message} + # update task entity with given data + objects.Task.update(task, data) + # NOTE(romcheg): Flushing the data is required to unlock + # tasks in order to temporary fix issues with + # the deadlock detection query in tests and let the tests pass. + # TODO(akislitsky): Get rid of this flush as soon as + # task locking issues are resolved. + db().flush() + TaskHelper.update_action_log(task, log_item) - db().commit() + db().commit() def check_running_task(self, task_name): current_tasks = db().query(Task).filter_by( @@ -1304,6 +1305,10 @@ class OpenstackConfigTaskManager(TaskManager): fail_if_not_found=True, lock_for_update=True ) + + if task.is_completed(): + return task + # locking nodes objects.NodeCollection.lock_nodes(nodes_to_update) diff --git a/nailgun/nailgun/task/task.py b/nailgun/nailgun/task/task.py index e36def52cb..1638d4ea77 100644 --- a/nailgun/nailgun/task/task.py +++ b/nailgun/nailgun/task/task.py @@ -100,7 +100,41 @@ def fake_cast(queue, messages, **kwargs): make_thread_task_in_orchestrator(messages) -class DeploymentTask(object): +class BaseDeploymentTask(object): + @classmethod + def _get_deployment_method(cls, cluster, ignore_task_deploy=False): + """Get deployment method name based on cluster version + + :param cluster: Cluster db object + :param ignore_task_deploy: do not check that task deploy enabled + :returns: string - deploy/granular_deploy + """ + if not ignore_task_deploy and \ + objects.Cluster.is_task_deploy_enabled(cluster): + return "task_deploy" + if objects.Release.is_granular_enabled(cluster.release): + return 'granular_deploy' + return 'deploy' + + @classmethod + def call_deployment_method(cls, task, *args, **kwargs): + """Calls the deployment method with fallback. + + :param task: the Task object instance + :param args: the positional arguments + :param kwargs: the keyword arguments + """ + for flag in (False, True): + try: + method = cls._get_deployment_method(task.cluster, flag) + message = getattr(cls, method)(task, *args, **kwargs) + return method, message + except errors.TaskBaseDeploymentNotAllowed: + logger.warning("Task deploy is not allowed, " + "fallback to granular deploy.") + + +class DeploymentTask(BaseDeploymentTask): """Task for applying changes to cluster LOGIC @@ -138,21 +172,6 @@ class DeploymentTask(object): those which are prepared for removal. """ - @classmethod - def _get_deployment_method(cls, cluster, ignore_task_deploy=False): - """Get deployment method name based on cluster version - - :param cluster: Cluster db object - :param ignore_task_deploy: do not check that task deploy enabled - :returns: string - deploy/granular_deploy - """ - if not ignore_task_deploy and \ - objects.Cluster.is_task_deploy_enabled(cluster): - return "task_deploy" - if objects.Release.is_granular_enabled(cluster.release): - return 'granular_deploy' - return 'deploy' - @classmethod def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None, reexecutable_filter=None): @@ -184,18 +203,9 @@ class DeploymentTask(object): n.progress = 0 db().flush() - deployment_mode = cls._get_deployment_method(task.cluster) - while True: - try: - message = getattr(cls, deployment_mode)( - task, nodes, affected_nodes, task_ids, reexecutable_filter - ) - break - except errors.TaskBaseDeploymentNotAllowed: - deployment_mode = cls._get_deployment_method( - task.cluster, True - ) - logger.warning("fallback to %s deploy.", deployment_mode) + deployment_mode, message = cls.call_deployment_method( + task, nodes, affected_nodes, task_ids, reexecutable_filter + ) # After serialization set pending_addition to False for node in nodes: @@ -1846,39 +1856,54 @@ class UpdateDnsmasqTask(object): ) -class UpdateOpenstackConfigTask(object): +class UpdateOpenstackConfigTask(BaseDeploymentTask): + + @staticmethod + def task_deploy(task, nodes, update_configs): + tasks = objects.Cluster.get_deployment_tasks(task.cluster) + events = task_based_deployment.TaskEvents( + 'refresh_on', update_configs + ) + directory, graph = task_based_deployment.TasksSerializer.serialize( + task.cluster, [], tasks, nodes, events=events + ) + return make_astute_message( + task, "task_deploy", "update_config_resp", { + "tasks_directory": directory, + "tasks_graph": graph + } + ) + + @staticmethod + def granular_deploy(task, nodes, update_configs): + refreshable_tasks = objects.Cluster.get_refreshable_tasks( + task.cluster, update_configs + ) + orchestrator_graph = deployment_graph.AstuteGraph(task.cluster) + task_ids = [t['id'] for t in refreshable_tasks] + orchestrator_graph.only_tasks(task_ids) + deployment_tasks = orchestrator_graph.stage_tasks_serialize( + orchestrator_graph.graph.topology, nodes + ) + return make_astute_message( + task, 'execute_tasks', 'update_config_resp', { + 'tasks': deployment_tasks, + }) @classmethod def message(cls, task, cluster, nodes): configs = objects.OpenstackConfigCollection.find_configs_for_nodes( cluster, nodes) - - refresh_on = set() + updated_configs = set() for config in configs: - refresh_on.update(config.configuration) + updated_configs.update(config.configuration) - refreshable_tasks = objects.Cluster.get_refreshable_tasks( - cluster, refresh_on) + if updated_configs: + updated_configs.add('*') # '*' means any config + else: + raise errors.NoChanges() - upload_serializer = tasks_serializer.UploadConfiguration( - task, task.cluster, nodes, configs) - tasks_to_execute = list(upload_serializer.serialize()) - - if refreshable_tasks: - orchestrator_graph = deployment_graph.AstuteGraph(task.cluster) - task_ids = [t['id'] for t in refreshable_tasks] - orchestrator_graph.only_tasks(task_ids) - - deployment_tasks = orchestrator_graph.stage_tasks_serialize( - orchestrator_graph.graph.topology, nodes) - tasks_to_execute.extend(deployment_tasks) - - rpc_message = make_astute_message( - task, 'execute_tasks', 'update_config_resp', { - 'tasks': tasks_to_execute, - }) - - return rpc_message + return cls.call_deployment_method(task, nodes, updated_configs)[1] if settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP: diff --git a/nailgun/nailgun/test/integration/test_task_managers.py b/nailgun/nailgun/test/integration/test_task_managers.py index 326e5745e3..7d4cf7b27a 100644 --- a/nailgun/nailgun/test/integration/test_task_managers.py +++ b/nailgun/nailgun/test/integration/test_task_managers.py @@ -78,9 +78,6 @@ class TestTaskManagers(BaseIntegrationTest): provision_task = filter( lambda t: t.name == TASK_NAMES.provision, supertask.subtasks)[0] self.assertEqual(provision_task.weight, 0.4) - - wait_nodes = [self.env.nodes[0]] - self.env.wait_for_nodes_status(wait_nodes, NODE_STATUSES.provisioning) self.env.wait_ready( supertask, 60, @@ -99,7 +96,6 @@ class TestTaskManagers(BaseIntegrationTest): u"Deployment of environment '{0}' is done.".format(cluster_name), supertask.message ) - self.env.wait_for_nodes_status(wait_nodes, NODE_STATUSES.ready) self.env.refresh_nodes() for n in filter( lambda n: n.cluster_id == cluster['id'], diff --git a/nailgun/nailgun/test/unit/test_openstack_config_handler.py b/nailgun/nailgun/test/unit/test_openstack_config_handler.py index bea366e27b..de66c8baa7 100644 --- a/nailgun/nailgun/test/unit/test_openstack_config_handler.py +++ b/nailgun/nailgun/test/unit/test_openstack_config_handler.py @@ -29,14 +29,25 @@ class TestOpenstackConfigHandlers(BaseIntegrationTest): def setUp(self): super(TestOpenstackConfigHandlers, self).setUp() + release_kwargs = { + 'version': 'liberty-9.0', + 'operating_system': consts.RELEASE_OS.ubuntu, + } + cluster_kwargs = {'net_provider': 'neutron'} + self.env.create_cluster(api=False, - status=consts.CLUSTER_STATUSES.operational) + status=consts.CLUSTER_STATUSES.operational, + release_kwargs=release_kwargs, + cluster_kwargs=cluster_kwargs) self.env.create_cluster(api=False, - status=consts.CLUSTER_STATUSES.operational) + status=consts.CLUSTER_STATUSES.operational, + release_kwargs=release_kwargs, + cluster_kwargs=cluster_kwargs) self.clusters = self.env.clusters self.nodes = self.env.create_nodes( 3, cluster_id=self.clusters[0].id, + roles=["compute"], status=consts.NODE_STATUSES.ready) self.env.create_openstack_config( @@ -214,20 +225,94 @@ class TestOpenstackConfigHandlers(BaseIntegrationTest): expect_errors=True) self.assertEqual(resp.status_code, 405) - @mock.patch('nailgun.task.task.rpc.cast') - def test_openstack_config_execute(self, _): + @mock.patch('objects.Cluster.get_deployment_tasks') + def execute_update_open_stack_config(self, tasks_mock): + tasks_mock.return_value = [{ + 'id': 'upload_configuration', + 'type': 'upload_file', + 'version': '2.0.0', + 'role': '*', + 'parameters': { + 'timeout': 180, + }, + 'refresh_on': ['*'] + }] data = {'cluster_id': self.clusters[0].id} resp = self.app.put( reverse('OpenstackConfigExecuteHandler'), jsonutils.dumps(data), headers=self.default_headers ) + return resp + + @mock.patch('nailgun.task.task.rpc.cast') + def test_openstack_config_execute_with_granular_deploy(self, mock_rpc): + self.env.disable_task_deploy(self.clusters[0]) + resp = self.execute_update_open_stack_config() self.assertEqual(resp.status_code, 202) + message = mock_rpc.call_args_list[0][0][1] + self.assertEqual('execute_tasks', message['method']) + self.assertEqual('update_config_resp', message['respond_to']) + # there is no task deduplication in granular deployment + # and some of tasks can be included + # to result list more than 1 times + self.assertItemsEqual( + ((n.uid, 'upload_file') for n in self.clusters[0].nodes), + {(t['uids'][0], t['type']) for t in message['args']['tasks']} + ) + node_1_upload_config = ( + t['parameters']['data'] for t in message['args']['tasks'] + if self.nodes[1].uid in t['uids'] + ) + self.assertItemsEqual( + [ + 'configuration: {}\n', + 'configuration: {nova_config: value_1_1}\n' + ], + node_1_upload_config + ) + + @mock.patch('nailgun.task.task.rpc.cast') + def test_openstack_config_execute_with_task_deploy(self, mock_rpc): + resp = self.execute_update_open_stack_config() + self.assertEqual(resp.status_code, 202) + message = mock_rpc.call_args_list[0][0][1] + self.assertEqual('task_deploy', message['method']) + self.assertEqual('update_config_resp', message['respond_to']) + tasks_graph = message['args']['tasks_graph'] + tasks_directory = message['args']['tasks_directory'] + nodes = [n.uid for n in self.clusters[0].nodes] + nodes.append(None) + self.assertItemsEqual(nodes, tasks_graph) + self.assertEqual( + 'upload_file', + tasks_graph[nodes[0]][0]['type'] + ) + node_1_upload_config = ( + tasks_directory[t['id']]['parameters']['data'] + for t in tasks_graph[nodes[1]] + ) + self.assertItemsEqual( + [ + 'configuration: {}\n', + 'configuration: {nova_config: value_1_1}\n' + ], + node_1_upload_config + ) + + @mock.patch('objects.OpenstackConfigCollection.find_configs_for_nodes') + def test_openstack_config_completed_exit_if_no_changes(self, m_conf): + m_conf.return_value = [] + resp = self.execute_update_open_stack_config() + self.assertEqual(200, resp.status_code) + self.assertEqual(consts.TASK_STATUSES.ready, resp.json_body['status']) @mock.patch('nailgun.task.task.rpc.cast') def test_openstack_config_execute_force(self, _): # Turn node 2 into provisioned state self.env.nodes[2].status = consts.NODE_STATUSES.provisioned - self.db.flush() + # need to persistent state in database because handler will revert + # all changes on error. + self.db.commit() # Try to update OpenStack configuration for cluster data = {'cluster_id': self.clusters[0].id} resp = self.app.put(