From 2209a3181181b5e937f1cc55668f0b1d3c12f474 Mon Sep 17 00:00:00 2001 From: Bulat Gaifullin Date: Thu, 4 Feb 2016 15:29:47 +0300 Subject: [PATCH] Moved parameters of tasks from graph to separate dict Since the task parameters is same for all nodes, the parameters was excluded from graph. this parameters is passed to orcestrator in seperate argument. the graph contains only links between tasks. This helps to reduce size of graph on more than 3 times. Change-Id: Icc8a43df83b01a2da53056178c6dd26089e50d7b Implements: blueprint reduce-size-of-serialized-tasks Depends-On: I7e46946c68614789e6c59cc0a0b6ac01fbe6547b --- .../orchestrator/task_based_deployment.py | 104 +++++++------ nailgun/nailgun/task/task.py | 10 +- .../test/integration/test_task_deploy.py | 11 +- .../test/unit/test_task_based_deployment.py | 137 ++++++++++-------- 4 files changed, 150 insertions(+), 112 deletions(-) diff --git a/nailgun/nailgun/orchestrator/task_based_deployment.py b/nailgun/nailgun/orchestrator/task_based_deployment.py index e901c38200..6102473229 100644 --- a/nailgun/nailgun/orchestrator/task_based_deployment.py +++ b/nailgun/nailgun/orchestrator/task_based_deployment.py @@ -15,7 +15,6 @@ # under the License. import collections -import copy from distutils.version import StrictVersion import itertools @@ -356,7 +355,7 @@ class TaskProcessor(object): requires_ex = current.setdefault('requires_ex', []) for node_id in previous.get('uids', ()): requires_ex.append( - {'name': previous['id'], 'node_id': node_id} + (previous['id'], node_id) ) @staticmethod @@ -387,7 +386,8 @@ class TasksSerializer(object): self.role_resolver = RoleResolver(nodes) self.task_serializer = DeployTaskSerializer() self.task_processor = TaskProcessor() - self.tasks_per_node = collections.defaultdict(dict) + self.tasks_connections = collections.defaultdict(dict) + self.tasks_dictionary = dict() self.task_filter = self.make_task_filter(task_ids) @classmethod @@ -399,15 +399,18 @@ class TasksSerializer(object): :param tasks: the list of tasks :param task_ids: Only specified tasks will be executed, If None, all tasks will be executed - :return: the list of serialized task per node + :return: the tasks dictionary, the tasks connections """ serializer = cls(cluster, nodes, task_ids) serializer.resolve_nodes(add_plugin_deployment_hooks(tasks), nodes) serializer.resolve_dependencies() - return dict( - (k, list(six.itervalues(v))) - for k, v in six.iteritems(serializer.tasks_per_node) - ) + tasks_dictionary = serializer.tasks_dictionary + tasks_connections = serializer.tasks_connections + for node_id in tasks_connections: + tasks_connections[node_id] = list( + six.itervalues(tasks_connections[node_id]) + ) + return tasks_dictionary, tasks_connections def resolve_nodes(self, tasks, nodes): """Resolves node roles in tasks. @@ -432,7 +435,7 @@ class TasksSerializer(object): self.expand_task_groups(groups, tasks_mapping) # make sure that null node is present - self.tasks_per_node.setdefault(None, dict()) + self.tasks_connections.setdefault(None, dict()) def process_task(self, task, nodes, resolver_factory, skip=False): """Processes one task one nodes of cluster. @@ -452,50 +455,65 @@ class TasksSerializer(object): # do not pass skipped attribute to astute skipped = skip or task.pop('skipped', False) or \ not task_serializer.should_execute() - for astute_task in self.task_processor.process_tasks( + for serialized in self.task_processor.process_tasks( task, task_serializer.serialize()): # all skipped task shall have type skipped # do not exclude them from graph to keep connections between nodes - if skipped: - astute_task['type'] = \ - consts.ORCHESTRATOR_TASK_TYPES.skipped - for node_id in astute_task.pop('uids', ()): - node_tasks = self.tasks_per_node[node_id] + if skipped: + task_type = consts.ORCHESTRATOR_TASK_TYPES.skipped + else: + task_type = serialized['type'] + + task_relations = { + 'id': serialized['id'], + 'type': task_type, + 'requires': serialized.pop('requires', []), + 'required_for': serialized.pop('required_for', []), + 'cross-depends': serialized.pop('cross-depends', []), + 'cross-depended-by': serialized.pop('cross-depended-by', []), + } + node_ids = serialized.pop('uids', ()) + self.tasks_dictionary[serialized['id']] = serialized + for node_id in node_ids: + node_tasks = self.tasks_connections[node_id] # de-duplication the tasks on node # since task can be added after expand group need to # overwrite if existed task is skipped and new is not skipped. - if self.need_update_task(node_tasks, astute_task): - node_tasks[astute_task['id']] = copy.deepcopy(astute_task) + if self.need_update_task(node_tasks, serialized): + node_tasks[serialized['id']] = task_relations.copy() def resolve_dependencies(self): """Resolves tasks dependencies.""" - for node_id, tasks in six.iteritems(self.tasks_per_node): + for node_id, tasks in six.iteritems(self.tasks_connections): for task in six.itervalues(tasks): - task['requires'] = list( - self.expand_dependencies( - node_id, task.get('requires'), False - ) - ) - task['required_for'] = list( - self.expand_dependencies( - node_id, task.get('required_for'), True - ) - ) - task['requires'].extend( - self.expand_cross_dependencies( - node_id, task.pop('cross-depends', None), False - ) - ) + requires = set(self.expand_dependencies( + node_id, task.pop('requires'), False + )) + requires.update(self.expand_cross_dependencies( + node_id, task.pop('cross-depends', None), False + )) + requires.update(task.pop('requires_ex', ())) - task['required_for'].extend( - self.expand_cross_dependencies( - node_id, task.pop('cross-depended-by', None), True - ) - ) - task['requires'].extend(task.pop('requires_ex', ())) - task['required_for'].extend(task.pop('required_for_ex', ())) + required_for = set(self.expand_dependencies( + node_id, task.pop('required_for'), True + )) + required_for.update(self.expand_cross_dependencies( + node_id, task.pop('cross-depended-by', None), True + )) + required_for.update(task.pop('required_for_ex', ())) + # render + if requires: + task['requires'] = [ + dict(six.moves.zip(('name', 'node_id'), r)) + for r in requires + ] + if required_for: + task['required_for'] = [ + dict(six.moves.zip(('name', 'node_id'), r)) + for r in required_for + ] def expand_task_groups(self, groups, task_mapping): """Expand group of tasks. @@ -573,11 +591,11 @@ class TasksSerializer(object): match_policy = NameMatchingPolicy.create(name) for node_id in node_ids: applied_tasks = set() - for task_name in self.tasks_per_node[node_id]: + for task_name in self.tasks_connections[node_id]: if task_name == name: # the simple case when name of current task # is exact math to name of task that is search - yield {"name": task_name, "node_id": node_id} + yield task_name, node_id continue # at first get the original task name, actual @@ -595,7 +613,7 @@ class TasksSerializer(object): task_name_gen = self.task_processor.get_last_task_id task_name = task_name_gen(original_task) - yield {"name": task_name, "node_id": node_id} + yield task_name, node_id @classmethod def need_update_task(cls, tasks, task): diff --git a/nailgun/nailgun/task/task.py b/nailgun/nailgun/task/task.py index 94d8ff1a14..98824ba04c 100644 --- a/nailgun/nailgun/task/task.py +++ b/nailgun/nailgun/task/task.py @@ -230,15 +230,21 @@ class DeploymentTask(object): @classmethod def task_deploy(cls, task, nodes, task_ids, reexecutable_filter): deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster) + logger.debug("start cluster serialization.") serialized_cluster = deployment_serializers.serialize( None, task.cluster, nodes ) - serialized_tasks = task_based_deployment.TasksSerializer.serialize( + logger.debug("finish cluster serialization.") + logger.debug("start tasks serialization.") + directory, graph = task_based_deployment.TasksSerializer.serialize( task.cluster, nodes, deployment_tasks, task_ids ) + logger.debug("finish tasks serialization.") return { "deployment_info": serialized_cluster, - "deployment_tasks": serialized_tasks + "tasks_directory": directory, + "tasks_graph": graph + } diff --git a/nailgun/nailgun/test/integration/test_task_deploy.py b/nailgun/nailgun/test/integration/test_task_deploy.py index 0fa89f9204..befc35fd1d 100644 --- a/nailgun/nailgun/test/integration/test_task_deploy.py +++ b/nailgun/nailgun/test/integration/test_task_deploy.py @@ -82,7 +82,8 @@ class TestTaskDeploy(BaseIntegrationTest): message = self.get_deploy_message() self.assertEqual("task_deploy", message["method"]) self.assertItemsEqual( - ["task_uuid", "deployment_info", "deployment_tasks"], + ["task_uuid", "deployment_info", + "tasks_directory", "tasks_graph"], message["args"] ) @@ -122,7 +123,7 @@ class TestTaskDeploy(BaseIntegrationTest): (x.uid for x in self.env.nodes if 'compute' in x.roles), None ) self.assertIsNotNone(compute_uid) - compute_tasks = message['args']['deployment_tasks'][compute_uid] + compute_tasks = message['args']['tasks_graph'][compute_uid] expected_tasks = { consts.PLUGIN_PRE_DEPLOYMENT_HOOK + "_start", @@ -167,10 +168,10 @@ class TestTaskDeploy(BaseIntegrationTest): ).status ) - deploy_tasks = rpc_cast.call_args[0][1]['args']['deployment_tasks'] + links = rpc_cast.call_args[0][1]['args']['tasks_graph'] self.assertItemsEqual( ["deploy_legacy"], - (task["id"] for task in deploy_tasks[compute.uid] + (task["id"] for task in links[compute.uid] if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped) ) @@ -187,7 +188,7 @@ class TestTaskDeploy(BaseIntegrationTest): self.db.flush() message = self.get_deploy_message() - deploy_tasks = message['args']['deployment_tasks'] + deploy_tasks = message['args']['tasks_graph'] self.assertIn( "netconfig", {task["id"] for task in deploy_tasks[compute.uid] diff --git a/nailgun/nailgun/test/unit/test_task_based_deployment.py b/nailgun/nailgun/test/unit/test_task_based_deployment.py index 0dae19c5ba..699d0fe339 100644 --- a/nailgun/nailgun/test/unit/test_task_based_deployment.py +++ b/nailgun/nailgun/test/unit/test_task_based_deployment.py @@ -38,18 +38,28 @@ class TestTaskSerializers(BaseTestCase): self.env.clusters[-1], self.env.nodes ) - def test_serialize(self): + def test_serialize_result(self): tasks = [ { - "id": "test1", "role": ["controller"], - "type": "stage", "version": "2.0.0" + "id": "task1", "role": ["controller"], + "type": "shell", "version": "2.0.0", + "parameters": {"cmd": "bash -c 'echo 1'"}, + "cross-depends": [{"name": "task3", "role": ["compute"]}] }, { - "id": "test2", "role": ["compute"], - "type": "stage", "version": "2.0.0" + "id": "task2", "role": ["controller"], + "type": "shell", "version": "2.0.0", + "parameters": {"cmd": "bash -c 'echo 1'"}, + "requires": ["task1"] + }, + { + "id": "task3", "role": ["compute"], + "type": "shell", "version": "2.0.0", + "parameters": {"cmd": "bash -c 'echo 2'"}, + "cross-depended-by": [{"name": "task2", "role": "*"}] }, ] - serialized = self.serializer.serialize( + tasks, connections = self.serializer.serialize( self.env.clusters[-1], self.env.nodes, tasks ) controllers = [ @@ -62,12 +72,27 @@ class TestTaskSerializers(BaseTestCase): self.assertEqual(1, len(controllers)) self.assertEqual(1, len(computes)) self.assertItemsEqual( - ["test1"], - (x["id"] for x in serialized[controllers[0]]) + [ + { + "id": "task1", + "type": "shell", + "requires": [{"name": "task3", "node_id": computes[0]}] + }, + { + "id": "task2", + "type": "shell", + "requires": [{"name": "task1", "node_id": controllers[0]}], + }, + ], + connections[controllers[0]] ) self.assertItemsEqual( - ["test2"], - (x["id"] for x in serialized[computes[0]]) + [{ + "id": "task3", + "type": "shell", + "required_for": [{"name": "task2", "node_id": controllers[0]}] + }], + connections[computes[0]] ) def test_serialize_fail_if_all_task_have_not_version_2(self): @@ -104,7 +129,7 @@ class TestTaskSerializers(BaseTestCase): ] serialized = self.serializer.serialize( self.env.clusters[-1], self.env.nodes, tasks, ids - ) + )[1] controllers = [ n.uid for n in self.env.nodes if "controller" in n.roles ] @@ -165,16 +190,16 @@ class TestTaskSerializers(BaseTestCase): self.serializer.process_task( task, ["1"], task_based_deployment.NullResolver ) - self.assertItemsEqual(["1"], self.serializer.tasks_per_node) - self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"]) + self.assertItemsEqual(["1"], self.serializer.tasks_connections) + self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) self.assertEqual( - "test", self.serializer.tasks_per_node["1"]["test"]["id"] + "test", self.serializer.tasks_connections["1"]["test"]["id"] ) self.assertEqual( - "puppet", self.serializer.tasks_per_node["1"]["test"]["type"] + "puppet", self.serializer.tasks_connections["1"]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_per_node["1"]["test"]["type"] + "skipped", self.serializer.tasks_connections["1"]["test"]["type"] ) def test_process_skipped_task(self): @@ -185,16 +210,16 @@ class TestTaskSerializers(BaseTestCase): self.serializer.process_task( task, ["1"], task_based_deployment.NullResolver ) - self.assertItemsEqual(["1"], self.serializer.tasks_per_node) - self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"]) + self.assertItemsEqual(["1"], self.serializer.tasks_connections) + self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) self.assertEqual( - "test", self.serializer.tasks_per_node["1"]["test"]["id"] + "test", self.serializer.tasks_connections["1"]["test"]["id"] ) self.assertEqual( - "skipped", self.serializer.tasks_per_node["1"]["test"]["type"] + "skipped", self.serializer.tasks_connections["1"]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_per_node["1"]["test"] + "skipped", self.serializer.tasks_connections["1"]["test"] ) def test_process_noop_task(self): @@ -202,16 +227,16 @@ class TestTaskSerializers(BaseTestCase): self.serializer.process_task( task, ["1"], task_based_deployment.NullResolver ) - self.assertItemsEqual(["1"], self.serializer.tasks_per_node) - self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"]) + self.assertItemsEqual(["1"], self.serializer.tasks_connections) + self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) self.assertEqual( - "test", self.serializer.tasks_per_node["1"]["test"]["id"] + "test", self.serializer.tasks_connections["1"]["test"]["id"] ) self.assertEqual( - "skipped", self.serializer.tasks_per_node["1"]["test"]["type"] + "skipped", self.serializer.tasks_connections["1"]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_per_node["1"]["test"] + "skipped", self.serializer.tasks_connections["1"]["test"] ) def test_expand_task_groups(self): @@ -230,30 +255,30 @@ class TestTaskSerializers(BaseTestCase): 'type': 'skipped', 'role': '*'} } ) - self.assertIn('1', self.serializer.tasks_per_node) - self.assertIn('2', self.serializer.tasks_per_node) + self.assertIn('1', self.serializer.tasks_connections) + self.assertIn('2', self.serializer.tasks_connections) self.assertItemsEqual( ['task1', 'task2'], - self.serializer.tasks_per_node['1'] + self.serializer.tasks_connections['1'] ) self.assertItemsEqual( - self.serializer.tasks_per_node['1'], - self.serializer.tasks_per_node['2'] + self.serializer.tasks_connections['1'], + self.serializer.tasks_connections['2'] ) def test_expand_dependencies_on_same_node(self): node_ids = ['1', '2'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) - self.serializer.tasks_per_node[None] = ['sync_point'] + self.serializer.tasks_connections[None] = ['sync_point'] self.assertItemsEqual( - [{'name': 'sync_point', 'node_id': None}], + [('sync_point', None)], self.serializer.expand_dependencies('1', ['sync_point'], False) ) self.assertItemsEqual( - [{'name': 'task_1', 'node_id': '1'}], + [('task_1', '1')], self.serializer.expand_dependencies('1', ['/task/'], False) ) @@ -271,7 +296,7 @@ class TestTaskSerializers(BaseTestCase): def test_expand_cross_dependencies(self): node_ids = ['1', '2', '3'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) @@ -279,7 +304,7 @@ class TestTaskSerializers(BaseTestCase): m_resolve.resolve.return_value = node_ids # the default role and policy self.assertItemsEqual( - [{'name': 'task_1', 'node_id': '1'}], + [('task_1', '1')], self.serializer.expand_cross_dependencies( '2', [{'name': 'task_1'}], True ) @@ -289,7 +314,7 @@ class TestTaskSerializers(BaseTestCase): ) # concrete role and policy self.assertItemsEqual( - [{'name': 'task_2', 'node_id': '2'}], + [('task_2', '2')], self.serializer.expand_cross_dependencies( '2', [{'name': 'task_2', 'role': ['role'], 'policy': 'any'}], @@ -302,7 +327,7 @@ class TestTaskSerializers(BaseTestCase): m_resolve.resolve.reset_mock() # use self as role self.assertItemsEqual( - [{'name': 'task_1', 'node_id': '1'}], + [('task_1', '1')], self.serializer.expand_cross_dependencies( '1', [{'name': 'task_1', 'role': 'self'}], @@ -313,45 +338,39 @@ class TestTaskSerializers(BaseTestCase): def test_resolve_relation_when_no_chains(self): node_ids = ['1', '2', '3'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) self.assertItemsEqual( - [{'node_id': '1', 'name': 'task_1'}], + [('task_1', '1')], self.serializer.resolve_relation('task_1', node_ids, True) ) self.assertItemsEqual( - ({'node_id': i, 'name': 'task_{0}'.format(i)} for i in node_ids), + (('task_{0}'.format(i), i) for i in node_ids), self.serializer.resolve_relation('/task/', node_ids, True) ) def test_resolve_relation_in_chain(self): node_ids = ['1', '2', '3'] - self.serializer.tasks_per_node = dict( + self.serializer.tasks_connections = dict( (node_id, ['task_{0}'.format(node_id)]) for node_id in node_ids ) self.serializer.task_processor.origin_task_ids = { 'task_1': 'task', 'task_2': 'task', 'task_3': 'task2' } - self.serializer.tasks_per_node['1'].append('task_2') + self.serializer.tasks_connections['1'].append('task_2') self.assertItemsEqual( - [ - {'node_id': '1', 'name': 'task_start'}, - {'node_id': '2', 'name': 'task_start'}, - ], + [('task_start', '1'), ('task_start', '2')], self.serializer.resolve_relation('task', node_ids, True) ) self.assertItemsEqual( - [ - {'node_id': '1', 'name': 'task_end'}, - {'node_id': '2', 'name': 'task_end'} - ], + [('task_end', '1'), ('task_end', '2')], self.serializer.resolve_relation('task', node_ids, False) ) self.assertItemsEqual( - [{'node_id': '1', 'name': 'task_1'}], + [('task_1', '1')], self.serializer.resolve_relation('task_1', node_ids, False) ) @@ -507,19 +526,13 @@ class TestTaskProcessor(BaseTestCase): } self.processor._link_tasks(previous, current) self.assertItemsEqual( - ( - {'name': 'test_task_start', 'node_id': n} - for n in previous['uids'] - ), + (('test_task_start', n) for n in previous['uids']), current['requires_ex'] ) - current['requires_ex'] = [{'name': 'test_task_start', 'node_id': '0'}] + current['requires_ex'] = [('test_task_start', '0')] self.processor._link_tasks(previous, current) self.assertItemsEqual( - ( - {'name': 'test_task_start', 'node_id': n} - for n in ['0'] + previous['uids'] - ), + (('test_task_start', n) for n in ['0'] + previous['uids']), current['requires_ex'] )