diff --git a/nailgun/nailgun/orchestrator/task_based_deployment.py b/nailgun/nailgun/orchestrator/task_based_deployment.py index e901c38200..6102473229 100644 --- a/nailgun/nailgun/orchestrator/task_based_deployment.py +++ b/nailgun/nailgun/orchestrator/task_based_deployment.py @@ -15,7 +15,6 @@ # under the License. import collections -import copy from distutils.version import StrictVersion import itertools @@ -356,7 +355,7 @@ class TaskProcessor(object): requires_ex = current.setdefault('requires_ex', []) for node_id in previous.get('uids', ()): requires_ex.append( - {'name': previous['id'], 'node_id': node_id} + (previous['id'], node_id) ) @staticmethod @@ -387,7 +386,8 @@ class TasksSerializer(object): self.role_resolver = RoleResolver(nodes) self.task_serializer = DeployTaskSerializer() self.task_processor = TaskProcessor() - self.tasks_per_node = collections.defaultdict(dict) + self.tasks_connections = collections.defaultdict(dict) + self.tasks_dictionary = dict() self.task_filter = self.make_task_filter(task_ids) @classmethod @@ -399,15 +399,18 @@ class TasksSerializer(object): :param tasks: the list of tasks :param task_ids: Only specified tasks will be executed, If None, all tasks will be executed - :return: the list of serialized task per node + :return: the tasks dictionary, the tasks connections """ serializer = cls(cluster, nodes, task_ids) serializer.resolve_nodes(add_plugin_deployment_hooks(tasks), nodes) serializer.resolve_dependencies() - return dict( - (k, list(six.itervalues(v))) - for k, v in six.iteritems(serializer.tasks_per_node) - ) + tasks_dictionary = serializer.tasks_dictionary + tasks_connections = serializer.tasks_connections + for node_id in tasks_connections: + tasks_connections[node_id] = list( + six.itervalues(tasks_connections[node_id]) + ) + return tasks_dictionary, tasks_connections def resolve_nodes(self, tasks, nodes): """Resolves node roles in tasks. @@ -432,7 +435,7 @@ class TasksSerializer(object): self.expand_task_groups(groups, tasks_mapping) # make sure that null node is present - self.tasks_per_node.setdefault(None, dict()) + self.tasks_connections.setdefault(None, dict()) def process_task(self, task, nodes, resolver_factory, skip=False): """Processes one task one nodes of cluster. @@ -452,50 +455,65 @@ class TasksSerializer(object): # do not pass skipped attribute to astute skipped = skip or task.pop('skipped', False) or \ not task_serializer.should_execute() - for astute_task in self.task_processor.process_tasks( + for serialized in self.task_processor.process_tasks( task, task_serializer.serialize()): # all skipped task shall have type skipped # do not exclude them from graph to keep connections between nodes - if skipped: - astute_task['type'] = \ - consts.ORCHESTRATOR_TASK_TYPES.skipped - for node_id in astute_task.pop('uids', ()): - node_tasks = self.tasks_per_node[node_id] + if skipped: + task_type = consts.ORCHESTRATOR_TASK_TYPES.skipped + else: + task_type = serialized['type'] + + task_relations = { + 'id': serialized['id'], + 'type': task_type, + 'requires': serialized.pop('requires', []), + 'required_for': serialized.pop('required_for', []), + 'cross-depends': serialized.pop('cross-depends', []), + 'cross-depended-by': serialized.pop('cross-depended-by', []), + } + node_ids = serialized.pop('uids', ()) + self.tasks_dictionary[serialized['id']] = serialized + for node_id in node_ids: + node_tasks = self.tasks_connections[node_id] # de-duplication the tasks on node # since task can be added after expand group need to # overwrite if existed task is skipped and new is not skipped. - if self.need_update_task(node_tasks, astute_task): - node_tasks[astute_task['id']] = copy.deepcopy(astute_task) + if self.need_update_task(node_tasks, serialized): + node_tasks[serialized['id']] = task_relations.copy() def resolve_dependencies(self): """Resolves tasks dependencies.""" - for node_id, tasks in six.iteritems(self.tasks_per_node): + for node_id, tasks in six.iteritems(self.tasks_connections): for task in six.itervalues(tasks): - task['requires'] = list( - self.expand_dependencies( - node_id, task.get('requires'), False - ) - ) - task['required_for'] = list( - self.expand_dependencies( - node_id, task.get('required_for'), True - ) - ) - task['requires'].extend( - self.expand_cross_dependencies( - node_id, task.pop('cross-depends', None), False - ) - ) + requires = set(self.expand_dependencies( + node_id, task.pop('requires'), False + )) + requires.update(self.expand_cross_dependencies( + node_id, task.pop('cross-depends', None), False + )) + requires.update(task.pop('requires_ex', ())) - task['required_for'].extend( - self.expand_cross_dependencies( - node_id, task.pop('cross-depended-by', None), True - ) - ) - task['requires'].extend(task.pop('requires_ex', ())) - task['required_for'].extend(task.pop('required_for_ex', ())) + required_for = set(self.expand_dependencies( + node_id, task.pop('required_for'), True + )) + required_for.update(self.expand_cross_dependencies( + node_id, task.pop('cross-depended-by', None), True + )) + required_for.update(task.pop('required_for_ex', ())) + # render + if requires: + task['requires'] = [ + dict(six.moves.zip(('name', 'node_id'), r)) + for r in requires + ] + if required_for: + task['required_for'] = [ + dict(six.moves.zip(('name', 'node_id'), r)) + for r in required_for + ] def expand_task_groups(self, groups, task_mapping): """Expand group of tasks. @@ -573,11 +591,11 @@ class TasksSerializer(object): match_policy = NameMatchingPolicy.create(name) for node_id in node_ids: applied_tasks = set() - for task_name in self.tasks_per_node[node_id]: + for task_name in self.tasks_connections[node_id]: if task_name == name: # the simple case when name of current task # is exact math to name of task that is search - yield {"name": task_name, "node_id": node_id} + yield task_name, node_id continue # at first get the original task name, actual @@ -595,7 +613,7 @@ class TasksSerializer(object): task_name_gen = self.task_processor.get_last_task_id task_name = task_name_gen(original_task) - yield {"name": task_name, "node_id": node_id} + yield task_name, node_id @classmethod def need_update_task(cls, tasks, task): diff --git a/nailgun/nailgun/task/task.py b/nailgun/nailgun/task/task.py index 94d8ff1a14..98824ba04c 100644 --- a/nailgun/nailgun/task/task.py +++ b/nailgun/nailgun/task/task.py @@ -230,15 +230,21 @@ class DeploymentTask(object): @classmethod def task_deploy(cls, task, nodes, task_ids, reexecutable_filter): deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster) + logger.debug("start cluster serialization.") serialized_cluster = deployment_serializers.serialize( None, task.cluster, nodes ) - serialized_tasks = task_based_deployment.TasksSerializer.serialize( + logger.debug("finish cluster serialization.") + logger.debug("start tasks serialization.") + directory, graph = task_based_deployment.TasksSerializer.serialize( task.cluster, nodes, deployment_tasks, task_ids ) + logger.debug("finish tasks serialization.") return { "deployment_info": serialized_cluster, - "deployment_tasks": serialized_tasks + "tasks_directory": directory, + "tasks_graph": graph + } diff --git a/nailgun/nailgun/test/integration/test_task_deploy.py b/nailgun/nailgun/test/integration/test_task_deploy.py index 0fa89f9204..befc35fd1d 100644 --- a/nailgun/nailgun/test/integration/test_task_deploy.py +++ b/nailgun/nailgun/test/integration/test_task_deploy.py @@ -82,7 +82,8 @@ class TestTaskDeploy(BaseIntegrationTest): message = self.get_deploy_message() self.assertEqual("task_deploy", message["method"]) self.assertItemsEqual( - ["task_uuid", "deployment_info", "deployment_tasks"], + ["task_uuid", "deployment_info", + "tasks_directory", "tasks_graph"], message["args"] ) @@ -122,7 +123,7 @@ class TestTaskDeploy(BaseIntegrationTest): (x.uid for x in self.env.nodes if 'compute' in x.roles), None ) self.assertIsNotNone(compute_uid) - compute_tasks = message['args']['deployment_tasks'][compute_uid] + compute_tasks = message['args']['tasks_graph'][compute_uid] expected_tasks = { consts.PLUGIN_PRE_DEPLOYMENT_HOOK + "_start", @@ -167,10 +168,10 @@ class TestTaskDeploy(BaseIntegrationTest): ).status ) - deploy_tasks = rpc_cast.call_args[0][1]['args']['deployment_tasks'] + links = rpc_cast.call_args[0][1]['args']['tasks_graph'] self.assertItemsEqual( ["deploy_legacy"], - (task["id"] for task in deploy_tasks[compute.uid] + (task["id"] for task in links[compute.uid] if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped) ) @@ -187,7 +188,7 @@ class TestTaskDeploy(BaseIntegrationTest): self.db.flush() message = self.get_deploy_message() - deploy_tasks = message['args']['deployment_tasks'] + deploy_tasks = message['args']['tasks_graph'] self.assertIn( "netconfig", {task["id"] for task in deploy_tasks[compute.uid] diff --git a/nailgun/nailgun/test/unit/test_task_based_deployment.py b/nailgun/nailgun/test/unit/test_task_based_deployment.py index 0dae19c5ba..699d0fe339 100644 --- a/nailgun/nailgun/test/unit/test_task_based_deployment.py +++ b/nailgun/nailgun/test/unit/test_task_based_deployment.py @@ -38,18 +38,28 @@ class TestTaskSerializers(BaseTestCase): self.env.clusters[-1], self.env.nodes ) - def test_serialize(self): + def test_serialize_result(self): tasks = [ { - "id": "test1", "role": ["controller"], - "type": "stage", "version": "2.0.0" + "id": "task1", "role": ["controller"], + "type": "shell", "version": "2.0.0", + "parameters": {"cmd": "bash -c 'echo 1'"}, + "cross-depends": [{"name": "task3", "role": ["compute"]}] }, { - "id": "test2", "role": ["compute"], - "type": "stage", "version": "2.0.0" + "id": "task2", "role": ["controller"], + "type": "shell", "version": "2.0.0", + "parameters": {"cmd": "bash -c 'echo 1'"}, + "requires": ["task1"] + }, + { + "id": "task3", "role": ["compute"], + "type": "shell", "version": "2.0.0", + "parameters": {"cmd": "bash -c 'echo 2'"}, + "cross-depended-by": [{"name": "task2", "role": "*"}] }, ] - serialized = self.serializer.serialize( + tasks, connections = self.serializer.serialize( self.env.clusters[-1], self.env.nodes, tasks ) controllers = [ @@ -62,12 +72,27 @@ class TestTaskSerializers(BaseTestCase): self.assertEqual(1, len(controllers)) self.assertEqual(1, len(computes)) self.assertItemsEqual( - ["test1"], - (x["id"] for x in serialized[controllers[0]]) + [ + { + "id": "task1", + "type": "shell", + "requires": [{"name": "task3", "node_id": computes[0]}] + }, + { + "id": "task2", + "type": "shell", + "requires": [{"name": "task1", "node_id": controllers[0]}], + }, + ], + connections[controllers[0]] ) self.assertItemsEqual( - ["test2"], - (x["id"] for x in serialized[computes[0]]) + [{ + "id": "task3", + "type": "shell", + "required_for": [{"name": "task2", "node_id": controllers[0]}] + }], + connections[computes[0]] ) def test_serialize_fail_if_all_task_have_not_version_2(self): @@ -104,7 +129,7 @@ class TestTaskSerializers(BaseTestCase): ] serialized = self.serializer.serialize( self.env.clusters[-1], self.env.nodes, tasks, ids - ) + )[1] controllers = [ n.uid for n in self.env.nodes if "controller" in n.roles ] @@ -165,16 +190,16 @@ class TestTaskSerializers(BaseTestCase): self.serializer.process_task( task, ["1"], task_based_deployment.NullResolver ) - self.assertItemsEqual(["1"], self.serializer.tasks_per_node) - self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"]) + self.assertItemsEqual(["1"], self.serializer.tasks_connections) + self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) self.assertEqual( - "test", self.serializer.tasks_per_node["1"]["test"]["id"] + "test", self.serializer.tasks_connections["1"]["test"]["id"] ) self.assertEqual( - "puppet", self.serializer.tasks_per_node["1"]["test"]["type"] + "puppet", self.serializer.tasks_connections["1"]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_per_node["1"]["test"]["type"] + "skipped", self.serializer.tasks_connections["1"]["test"]["type"] ) def test_process_skipped_task(self): @@ -185,16 +210,16 @@ class TestTaskSerializers(BaseTestCase): self.serializer.process_task( task, ["1"], task_based_deployment.NullResolver ) - self.assertItemsEqual(["1"], self.serializer.tasks_per_node) - self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"]) + self.assertItemsEqual(["1"], self.serializer.tasks_connections) + self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) self.assertEqual( - "test", self.serializer.tasks_per_node["1"]["test"]["id"] + "test", self.serializer.tasks_connections["1"]["test"]["id"] ) self.assertEqual( - "skipped", self.serializer.tasks_per_node["1"]["test"]["type"] + "skipped", self.serializer.tasks_connections["1"]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_per_node["1"]["test"] + "skipped", self.serializer.tasks_connections["1"]["test"] ) def test_process_noop_task(self): @@ -202,16 +227,16 @@ class TestTaskSerializers(BaseTestCase): self.serializer.process_task( task, ["1"], task_based_deployment.NullResolver ) - self.assertItemsEqual(["1"], self.serializer.tasks_per_node) - self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"]) + self.assertItemsEqual(["1"], self.serializer.tasks_connections) + self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) self.assertEqual( - "test", self.serializer.tasks_per_node["1"]["test"]["id"] + "test", self.serializer.tasks_connections["1"]["test"]["id"] ) self.assertEqual( - "skipped", self.serializer.tasks_per_node["1"]["test"]["type"] + "skipped", self.serializer.tasks_connections["1"]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_per_node["1"]["test"] + "skipped", self.serializer.tasks_connections["1"]["test"] ) def test_expand_task_groups(self): @@ -230,30 +255,30 @@ class TestTaskSerializers(BaseTestCase): 'type': 'skipped', 'role': '*'} } ) - self.assertIn('1', self.serializer.tasks_per_node) - self.assertIn('2', self.serializer.tasks_per_node) + self.assertIn('1', self.serializer.tasks_connections) + self.assertIn('2', self.serializer.tasks_connections) self.assertItemsEqual( ['task1', 'task2'], - self.serializer.tasks_per_node['1'] + self.serializer.tasks_connections['1'] ) self.assertItemsEqual( - self.serializer.tasks_per_node['1'], - self.serializer.tasks_per_node['2'] + self.serializer.tasks_connections['1'], + self.serializer.tasks_connections['2'] ) def test_expand_dependencies_on_same_node(self): node_ids = ['1', '2'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) - self.serializer.tasks_per_node[None] = ['sync_point'] + self.serializer.tasks_connections[None] = ['sync_point'] self.assertItemsEqual( - [{'name': 'sync_point', 'node_id': None}], + [('sync_point', None)], self.serializer.expand_dependencies('1', ['sync_point'], False) ) self.assertItemsEqual( - [{'name': 'task_1', 'node_id': '1'}], + [('task_1', '1')], self.serializer.expand_dependencies('1', ['/task/'], False) ) @@ -271,7 +296,7 @@ class TestTaskSerializers(BaseTestCase): def test_expand_cross_dependencies(self): node_ids = ['1', '2', '3'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) @@ -279,7 +304,7 @@ class TestTaskSerializers(BaseTestCase): m_resolve.resolve.return_value = node_ids # the default role and policy self.assertItemsEqual( - [{'name': 'task_1', 'node_id': '1'}], + [('task_1', '1')], self.serializer.expand_cross_dependencies( '2', [{'name': 'task_1'}], True ) @@ -289,7 +314,7 @@ class TestTaskSerializers(BaseTestCase): ) # concrete role and policy self.assertItemsEqual( - [{'name': 'task_2', 'node_id': '2'}], + [('task_2', '2')], self.serializer.expand_cross_dependencies( '2', [{'name': 'task_2', 'role': ['role'], 'policy': 'any'}], @@ -302,7 +327,7 @@ class TestTaskSerializers(BaseTestCase): m_resolve.resolve.reset_mock() # use self as role self.assertItemsEqual( - [{'name': 'task_1', 'node_id': '1'}], + [('task_1', '1')], self.serializer.expand_cross_dependencies( '1', [{'name': 'task_1', 'role': 'self'}], @@ -313,45 +338,39 @@ class TestTaskSerializers(BaseTestCase): def test_resolve_relation_when_no_chains(self): node_ids = ['1', '2', '3'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) self.assertItemsEqual( - [{'node_id': '1', 'name': 'task_1'}], + [('task_1', '1')], self.serializer.resolve_relation('task_1', node_ids, True) ) self.assertItemsEqual( - ({'node_id': i, 'name': 'task_{0}'.format(i)} for i in node_ids), + (('task_{0}'.format(i), i) for i in node_ids), self.serializer.resolve_relation('/task/', node_ids, True) ) def test_resolve_relation_in_chain(self): node_ids = ['1', '2', '3'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) self.serializer.task_processor.origin_task_ids = { 'task_1': 'task', 'task_2': 'task', 'task_3': 'task2' } - self.serializer.tasks_per_node['1'].append('task_2') + self.serializer.tasks_connections['1'].append('task_2') self.assertItemsEqual( - [ - {'node_id': '1', 'name': 'task_start'}, - {'node_id': '2', 'name': 'task_start'}, - ], + [('task_start', '1'), ('task_start', '2')], self.serializer.resolve_relation('task', node_ids, True) ) self.assertItemsEqual( - [ - {'node_id': '1', 'name': 'task_end'}, - {'node_id': '2', 'name': 'task_end'} - ], + [('task_end', '1'), ('task_end', '2')], self.serializer.resolve_relation('task', node_ids, False) ) self.assertItemsEqual( - [{'node_id': '1', 'name': 'task_1'}], + [('task_1', '1')], self.serializer.resolve_relation('task_1', node_ids, False) ) @@ -507,19 +526,13 @@ class TestTaskProcessor(BaseTestCase): } self.processor._link_tasks(previous, current) self.assertItemsEqual( - ( - {'name': 'test_task_start', 'node_id': n} - for n in previous['uids'] - ), + (('test_task_start', n) for n in previous['uids']), current['requires_ex'] ) - current['requires_ex'] = [{'name': 'test_task_start', 'node_id': '0'}] + current['requires_ex'] = [('test_task_start', '0')] self.processor._link_tasks(previous, current) self.assertItemsEqual( - ( - {'name': 'test_task_start', 'node_id': n} - for n in ['0'] + previous['uids'] - ), + (('test_task_start', n) for n in ['0'] + previous['uids']), current['requires_ex'] )