From 03e98df9da2f2e3af6f2c1cbf35ef554264b38b8 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 1 Jul 2021 17:22:08 -0700 Subject: [PATCH] Use the nodeset build parameter instead of hosts/groups The serialized nodeset is now supplied as a build parameter, which makes the synthetic hosts and groups parameters which are derived from it redundant. Update the executor to rely entirely on the deserialized nodeset. We also rename the method which creates the parameters since they are not used for gearman any more. A subsequent change can remove the hosts and nodes parameters. Change-Id: Ied7f78c332485e5c66b5721c1007c25660d4238e --- tests/base.py | 6 +- tests/unit/test_executor.py | 94 ++++++++++++++++---------- tests/unit/test_web.py | 11 +++ zuul/executor/client.py | 3 +- zuul/executor/common.py | 11 +-- zuul/executor/server.py | 130 +++++++++++++++++------------------- zuul/model.py | 5 +- zuul/rpclistener.py | 2 +- 8 files changed, 149 insertions(+), 113 deletions(-) diff --git a/tests/base.py b/tests/base.py index 1a2b5025de..8a2149b2fe 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3112,9 +3112,9 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): result = (self.RESULT_NORMAL, 0) return result - def getHostList(self, args): - self.log.debug("hostlist") - hosts = super(RecordingAnsibleJob, self).getHostList(args) + def getHostList(self, args, nodes): + self.log.debug("hostlist %s", nodes) + hosts = super(RecordingAnsibleJob, self).getHostList(args, nodes) for host in hosts: if not host['host_vars'].get('ansible_connection'): host['host_vars']['ansible_connection'] = 'local' diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index 96de62977b..9b776d0f2b 100644 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -32,9 +32,9 @@ from tests.base import ( from zuul.executor.sensors.startingbuilds import StartingBuildsSensor from zuul.executor.sensors.ram import RAMSensor -from zuul.executor.server import AnsibleJob, squash_variables +from zuul.executor.server import AnsibleJob, JobDir, squash_variables from zuul.lib.ansible import AnsibleManager -from zuul.model import BuildRequest +from zuul.model import BuildRequest, NodeSet, Group class TestExecutorRepos(ZuulTestCase): @@ -456,59 +456,86 @@ class TestAnsibleJob(ZuulTestCase): ) self.test_job = AnsibleJob(self.executor_server, build_request, params) + self.test_job.jobdir = JobDir(self.executor_server.jobdir_root, + self.executor_server.keep_jobdir, + str(build_request.uuid)) - def test_getHostList_host_keys(self): + def test_prepareNodes_host_keys(self): # Test without connection_port set node = {'name': 'fake-host', + 'label': 'fake-label', + 'state': 'ready', + 'cloud': 'fake', 'host_keys': ['fake-host-key'], 'interface_ip': 'localhost'} - keys = self.test_job.getHostList({'nodes': [node], - 'host_vars': {}, - 'vars': {}, - 'groups': [], - })[0]['host_keys'] + nodeset = { + "name": "dummy-node", + "node_request_id": 0, + "nodes": [node], + "groups": [], + } + self.test_job.nodeset = NodeSet.fromDict(nodeset) + self.test_job.prepareNodes({'host_vars': {}, + 'vars': {}, + 'groups': [], + }) + keys = self.test_job.host_list[0]['host_keys'] self.assertEqual(keys[0], 'localhost fake-host-key') # Test with custom connection_port set node['connection_port'] = 22022 - keys = self.test_job.getHostList({'nodes': [node], - 'host_vars': {}, - 'vars': {}, - 'groups': [], - })[0]['host_keys'] + self.test_job.nodeset = NodeSet.fromDict(nodeset) + self.test_job.prepareNodes({'host_vars': {}, + 'vars': {}, + 'groups': [], + }) + keys = self.test_job.host_list[0]['host_keys'] self.assertEqual(keys[0], '[localhost]:22022 fake-host-key') # Test with no host keys node['host_keys'] = [] - host = self.test_job.getHostList({'nodes': [node], - 'host_vars': {}, - 'vars': {}, - 'groups': [], - })[0] + self.test_job.nodeset = NodeSet.fromDict(nodeset) + self.test_job.prepareNodes({'nodes': [node], + 'host_vars': {}, + 'vars': {}, + 'groups': [], + }) + host = self.test_job.host_list[0] self.assertEqual(host['host_keys'], []) self.assertEqual( host['host_vars']['ansible_ssh_common_args'], '-o StrictHostKeyChecking=false') - def test_getHostList_shell_type(self): + def test_prepareNodes_shell_type(self): # Test without shell type set node = {'name': 'fake-host', + 'label': 'fake-label', + 'state': 'ready', + 'cloud': 'fake', 'host_keys': ['fake-host-key'], 'interface_ip': 'localhost'} - host = self.test_job.getHostList({'nodes': [node], - 'host_vars': {}, - 'vars': {}, - 'groups': [], - })[0] + nodeset = { + "name": "dummy-node", + "node_request_id": 0, + "nodes": [node], + "groups": [], + } + self.test_job.nodeset = NodeSet.fromDict(nodeset) + self.test_job.prepareNodes({'host_vars': {}, + 'vars': {}, + 'groups': [], + }) + host = self.test_job.host_list[0] self.assertNotIn('ansible_shell_type', host['host_vars']) # Test with custom shell type set. node['shell_type'] = 'cmd' - host = self.test_job.getHostList({'nodes': [node], - 'host_vars': {}, - 'vars': {}, - 'groups': [], - })[0] + self.test_job.nodeset = NodeSet.fromDict(nodeset) + self.test_job.prepareNodes({'host_vars': {}, + 'vars': {}, + 'groups': [], + }) + host = self.test_job.host_list[0] self.assertIn('ansible_shell_type', host['host_vars']) self.assertEqual( host['host_vars']['ansible_shell_type'], @@ -978,6 +1005,7 @@ class TestExecutorExtraPackages(AnsibleZuulTestCase): class TestVarSquash(BaseTestCase): def test_squash_variables(self): # Test that we correctly squash job variables + nodeset = NodeSet() nodes = [ {'name': 'node1', 'host_vars': { 'host': 'node1_host', @@ -988,10 +1016,8 @@ class TestVarSquash(BaseTestCase): 'extra': 'node2_extra', }}, ] - groups = [ - {'name': 'group1', 'nodes': ['node1']}, - {'name': 'group2', 'nodes': ['node2']}, - ] + nodeset.addGroup(Group('group1', ['node1'])) + nodeset.addGroup(Group('group2', ['node2'])) groupvars = { 'group1': { 'host': 'group1_host', @@ -1017,7 +1043,7 @@ class TestVarSquash(BaseTestCase): 'extra': 'extravar_extra', } out = squash_variables( - nodes, groups, jobvars, groupvars, extravars) + nodes, nodeset, jobvars, groupvars, extravars) expected = { 'node1': { diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index 2cc2547a59..72cc73c8ba 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -1026,6 +1026,17 @@ class TestWeb(BaseTestWeb): 'name': ['controller'], 'state': 'unknown' }], + 'nodeset': { + 'groups': [], + 'name': '', + 'nodes': [ + {'aliases': [], + 'comment': None, + 'hold_job': None, + 'id': None, + 'label': 'label1', + 'name': 'controller', + 'state': 'unknown'}]}, 'override_branch': None, 'override_checkout': None, 'repo_state': {}, diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 9e010f2119..815baf9b0b 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -59,7 +59,7 @@ class ExecutorClient(object): "with dependent changes %s", job, uuid, nodeset, item.change, dependent_changes) - params = zuul.executor.common.construct_gearman_params( + params = zuul.executor.common.construct_build_params( uuid, self.sched, nodeset, job, item, pipeline, dependent_changes, merger_items, redact_secrets_and_keys=False) @@ -102,7 +102,6 @@ class ExecutorClient(object): # Store the NodeRequest ID in the job arguments, so we can look it up # on the executor side to lock the nodes. - params["nodeset"] = nodeset.toDict() node_request = build.build_set.getJobNodeRequest(job.name) if node_request: params["noderequest_id"] = node_request.id diff --git a/zuul/executor/common.py b/zuul/executor/common.py index 48c4c421c9..1e1951d515 100644 --- a/zuul/executor/common.py +++ b/zuul/executor/common.py @@ -17,12 +17,12 @@ import os from zuul.lib import strings -def construct_gearman_params(uuid, sched, nodeset, job, item, pipeline, - dependent_changes=[], merger_items=[], - redact_secrets_and_keys=True): +def construct_build_params(uuid, sched, nodeset, job, item, pipeline, + dependent_changes=[], merger_items=[], + redact_secrets_and_keys=True): """Returns a list of all the parameters needed to build a job. - These parameters may be passed to zuul-executors (via gearman) to perform + These parameters may be passed to zuul-executors (via ZK) to perform the job itself. Alternatively they contain enough information to load into another build @@ -124,6 +124,8 @@ def construct_gearman_params(uuid, sched, nodeset, job, item, pipeline, params['cleanup_playbooks'] = [make_playbook(x) for x in job.cleanup_run] + # TODO(corvus): Remove nodes and groups since they're included in + # nodeset nodes = [] for node in nodeset.getNodes(): n = node.toDict() @@ -131,6 +133,7 @@ def construct_gearman_params(uuid, sched, nodeset, job, item, pipeline, nodes.append(n) params['nodes'] = nodes params['groups'] = [group.toDict() for group in nodeset.getGroups()] + params["nodeset"] = nodeset.toDict() params['ssh_keys'] = [] if pipeline.post_review: if redact_secrets_and_keys: diff --git a/zuul/executor/server.py b/zuul/executor/server.py index a2267c6dde..8293f52068 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -753,8 +753,7 @@ class DeduplicateQueue(object): self.condition.release() -def squash_variables(nodes, groups, jobvars, groupvars, - extravars): +def squash_variables(nodes, nodeset, jobvars, groupvars, extravars): """Combine the Zuul job variable parameters into a hostvars dictionary. This is used by the executor when freezing job variables. It @@ -763,10 +762,9 @@ def squash_variables(nodes, groups, jobvars, groupvars, therefore group vars and extra vars can be combined in such a way to present a single hierarchy of variables visible to each host). - :param list nodes: A list of node dictionaries (as supplied by - the executor client) - :param dict groups: A list of group dictionaries (as supplied by - the executor client) + :param list nodes: A list of node dictionaries (as returned by + getHostList) + :param Nodeset nodeset: A nodeset (used for group membership). :param dict jobvars: A dictionary corresponding to Zuul's job.vars. :param dict groupvars: A dictionary keyed by group name with a value of a dictionary of variables for that group. @@ -781,18 +779,18 @@ def squash_variables(nodes, groups, jobvars, groupvars, # Zuul runs ansible with the default hash behavior of 'replace'; # this means we don't need to deep-merge dictionaries. + groups = sorted(nodeset.getGroups(), key=lambda g: g.name) for node in nodes: hostname = node['name'] ret[hostname] = {} # group 'all' ret[hostname].update(jobvars) # group vars - groups = sorted(groups, key=lambda g: g['name']) if 'all' in groupvars: ret[hostname].update(groupvars.get('all', {})) for group in groups: - if hostname in group['nodes']: - ret[hostname].update(groupvars.get(group['name'], {})) + if hostname in group.nodes: + ret[hostname].update(groupvars.get(group.name, {})) # host vars ret[hostname].update(node['host_vars']) # extra vars @@ -818,16 +816,16 @@ def make_setup_inventory_dict(nodes, hostvars): return inventory -def is_group_var_set(name, host, args): - for group in args['groups']: - if host in group['nodes']: - group_vars = args['group_vars'].get(group['name'], {}) +def is_group_var_set(name, host, nodeset, args): + for group in nodeset.getGroups(): + if host in group.nodes: + group_vars = args['group_vars'].get(group.name, {}) if name in group_vars: return True return False -def make_inventory_dict(nodes, groups, hostvars, remove_keys=None): +def make_inventory_dict(nodes, nodeset, hostvars, remove_keys=None): hosts = {} for node in nodes: node_hostvars = hostvars[node['name']].copy() @@ -851,16 +849,16 @@ def make_inventory_dict(nodes, groups, hostvars, remove_keys=None): } } - for group in groups: + for group in nodeset.getGroups(): if 'children' not in inventory['all']: inventory['all']['children'] = dict() group_hosts = {} - for node_name in group['nodes']: + for node_name in group.nodes: group_hosts[node_name] = None inventory['all']['children'].update({ - group['name']: { + group.name: { 'hosts': group_hosts, }}) @@ -1759,9 +1757,9 @@ class AnsibleJob(object): "Could not decode json from {logfile}".format( logfile=json_output)) - def getHostList(self, args): + def getHostList(self, args, nodes): hosts = [] - for node in args['nodes']: + for node in nodes: # NOTE(mordred): This assumes that the nodepool launcher # and the zuul executor both have similar network # characteristics, as the launcher will do a test for ipv6 @@ -1770,9 +1768,9 @@ class AnsibleJob(object): # set to True in the clouds.yaml for a cloud if this # results in the wrong thing being in interface_ip # TODO(jeblair): Move this notice to the docs. - for name in node['name']: - ip = node.get('interface_ip') - port = node.get('connection_port', node.get('ssh_port', 22)) + for name in node.name: + ip = node.interface_ip + port = node.connection_port host_vars = args['host_vars'].get(name, {}).copy() check_varnames(host_vars) host_vars.update(dict( @@ -1780,18 +1778,18 @@ class AnsibleJob(object): ansible_user=self.executor_server.default_username, ansible_port=port, nodepool=dict( - label=node.get('label'), - az=node.get('az'), - cloud=node.get('cloud'), - provider=node.get('provider'), - region=node.get('region'), - host_id=node.get('host_id'), - external_id=node.get('external_id'), - interface_ip=node.get('interface_ip'), - public_ipv4=node.get('public_ipv4'), - private_ipv4=node.get('private_ipv4'), - public_ipv6=node.get('public_ipv6'), - private_ipv6=node.get('private_ipv6')))) + label=node.label, + az=node.az, + cloud=node.cloud, + provider=node.provider, + region=node.region, + host_id=node.host_id, + external_id=getattr(node, 'external_id', None), + interface_ip=node.interface_ip, + public_ipv4=node.public_ipv4, + private_ipv4=node.private_ipv4, + public_ipv6=node.public_ipv6, + private_ipv6=node.private_ipv6))) # Ansible >=2.8 introduced "auto" as an # ansible_python_interpreter argument that looks up @@ -1804,15 +1802,15 @@ class AnsibleJob(object): # user control. api = 'ansible_python_interpreter' if (api not in args['vars'] and - not is_group_var_set(api, name, args)): - python = node.get('python_path', 'auto') + not is_group_var_set(api, name, self.nodeset, args)): + python = getattr(node, 'python_path', 'auto') host_vars.setdefault(api, python) - username = node.get('username') + username = node.username if username: host_vars['ansible_user'] = username - connection_type = node.get('connection_type') + connection_type = node.connection_type if connection_type: host_vars['ansible_connection'] = connection_type if connection_type == "winrm": @@ -1835,19 +1833,19 @@ class AnsibleJob(object): self.winrm_read_timeout elif connection_type == "kubectl": host_vars['ansible_kubectl_context'] = \ - node.get('kubectl_context') + getattr(node, 'kubectl_context', None) - shell_type = node.get('shell_type') + shell_type = getattr(node, 'shell_type', None) if shell_type: host_vars['ansible_shell_type'] = shell_type host_keys = [] - for key in node.get('host_keys', []): + for key in getattr(node, 'host_keys', []): if port != 22: host_keys.append("[%s]:%s %s" % (ip, port, key)) else: host_keys.append("%s %s" % (ip, key)) - if not node.get('host_keys'): + if not getattr(node, 'host_keys', None): host_vars['ansible_ssh_common_args'] = \ '-o StrictHostKeyChecking=false' @@ -2295,32 +2293,33 @@ class AnsibleJob(object): def prepareNodes(self, args): # Returns the zuul.resources ansible variable for later user - # Used to remove resource nodes from the inventory - resources_nodes = [] + # The (non-resource) nodes we want to keep in the inventory + inventory_nodes = [] # The zuul.resources ansible variable zuul_resources = {} - for node in args['nodes']: - if node.get('connection_type') in ( + for node in self.nodeset.getNodes(): + if node.connection_type in ( 'namespace', 'project', 'kubectl'): # TODO: decrypt resource data using scheduler key - data = node['connection_port'] + data = node.connection_port # Setup kube/config file self.prepareKubeConfig(self.jobdir, data) # Convert connection_port in kubectl connection parameters - node['connection_port'] = None - node['kubectl_namespace'] = data['namespace'] - node['kubectl_context'] = data['context_name'] + node.connection_port = None + node.kubectl_namespace = data['namespace'] + node.kubectl_context = data['context_name'] # Add node information to zuul.resources - zuul_resources[node['name'][0]] = { + zuul_resources[node.name[0]] = { 'namespace': data['namespace'], 'context': data['context_name'], } - if node['connection_type'] in ('project', 'namespace'): + if node.connection_type in ('project', 'namespace'): # Project are special nodes that are not the inventory - resources_nodes.append(node) + pass else: + inventory_nodes.append(node) # Add the real pod name to the resources_var - zuul_resources[node['name'][0]]['pod'] = data['pod'] + zuul_resources[node.name[0]]['pod'] = data['pod'] fwd = KubeFwd(zuul_event_id=self.zuul_event_id, build=self.build_request.uuid, @@ -2331,18 +2330,17 @@ class AnsibleJob(object): try: fwd.start() self.port_forwards.append(fwd) - zuul_resources[node['name'][0]]['stream_port'] = \ + zuul_resources[node.name[0]]['stream_port'] = \ fwd.port except Exception: self.log.exception("Unable to start port forward:") self.log.error("Kubectl and socat are required for " "streaming logs") + else: + # A normal node to include in inventory + inventory_nodes.append(node) - # Remove resource node from nodes list - for node in resources_nodes: - args['nodes'].remove(node) - - self.host_list = self.getHostList(args) + self.host_list = self.getHostList(args, inventory_nodes) with open(self.jobdir.known_hosts, 'w') as known_hosts: for node in self.host_list: @@ -2356,8 +2354,8 @@ class AnsibleJob(object): # Check the group and extra var names for safety; they'll get # merged later - for group in args['groups']: - group_vars = args['group_vars'].get(group['name'], {}) + for group in self.nodeset.getGroups(): + group_vars = args['group_vars'].get(group.name, {}) check_varnames(group_vars) check_varnames(args['extra_vars']) @@ -2390,7 +2388,7 @@ class AnsibleJob(object): } host_list = self.host_list + [localhost] self.original_hostvars = squash_variables( - host_list, args['groups'], all_vars, + host_list, self.nodeset, all_vars, args['group_vars'], args['extra_vars']) def loadFrozenHostvars(self): @@ -2426,9 +2424,8 @@ class AnsibleJob(object): def writeDebugInventory(self): # This file is unused by Zuul, but the base jobs copy it to logs # for debugging, so let's continue to put something there. - args = self.arguments inventory = make_inventory_dict( - self.host_list, args['groups'], self.original_hostvars) + self.host_list, self.nodeset, self.original_hostvars) inventory['all']['vars']['zuul'] = self.zuul_vars with open(self.jobdir.inventory, 'w') as inventory_yaml: @@ -2449,9 +2446,8 @@ class AnsibleJob(object): default_flow_style=False)) def writeInventory(self, jobdir_playbook, hostvars): - args = self.arguments inventory = make_inventory_dict( - self.host_list, args['groups'], hostvars, + self.host_list, self.nodeset, hostvars, remove_keys=jobdir_playbook.secrets_keys) with open(jobdir_playbook.inventory, 'w') as inventory_yaml: diff --git a/zuul/model.py b/zuul/model.py index 5d071cde2a..d9e76bfdb7 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -658,7 +658,7 @@ class Node(ConfigObject): self._state = data['state'] keys = [] for k, v in data.items(): - if k == 'state': + if k in ['state', 'name', 'aliases']: continue keys.append(k) setattr(self, k, v) @@ -666,7 +666,8 @@ class Node(ConfigObject): @classmethod def fromDict(cls, data): - node = cls(data["name"], data["label"]) + aliases = data.get('aliases', []) + node = cls([data["name"]] + aliases, data["label"]) node.updateFromDict(data) return node diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index c0be6e9eca..c46e070896 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -509,7 +509,7 @@ class RPCListener(RPCListenerBase): nodeset = job.nodeset job.setBase(tenant.layout) uuid = '0' * 32 - params = zuul.executor.common.construct_gearman_params( + params = zuul.executor.common.construct_build_params( uuid, self.sched, nodeset, job, item, pipeline) gear_job.sendWorkComplete(json.dumps(params, cls=ZuulJSONEncoder))