Browse Source

Merge "Use the nodeset build parameter instead of hosts/groups"

changes/49/801749/2
Zuul 2 months ago
committed by Gerrit Code Review
parent
commit
a181dc5da7
  1. 6
      tests/base.py
  2. 94
      tests/unit/test_executor.py
  3. 11
      tests/unit/test_web.py
  4. 3
      zuul/executor/client.py
  5. 11
      zuul/executor/common.py
  6. 130
      zuul/executor/server.py
  7. 5
      zuul/model.py
  8. 2
      zuul/rpclistener.py

6
tests/base.py

@ -3113,9 +3113,9 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
result = (self.RESULT_NORMAL, 0)
return result
def getHostList(self, args):
self.log.debug("hostlist")
hosts = super(RecordingAnsibleJob, self).getHostList(args)
def getHostList(self, args, nodes):
self.log.debug("hostlist %s", nodes)
hosts = super(RecordingAnsibleJob, self).getHostList(args, nodes)
for host in hosts:
if not host['host_vars'].get('ansible_connection'):
host['host_vars']['ansible_connection'] = 'local'

94
tests/unit/test_executor.py

@ -32,9 +32,9 @@ from tests.base import (
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
from zuul.executor.sensors.ram import RAMSensor
from zuul.executor.server import AnsibleJob, squash_variables
from zuul.executor.server import AnsibleJob, JobDir, squash_variables
from zuul.lib.ansible import AnsibleManager
from zuul.model import BuildRequest
from zuul.model import BuildRequest, NodeSet, Group
class TestExecutorRepos(ZuulTestCase):
@ -456,59 +456,86 @@ class TestAnsibleJob(ZuulTestCase):
)
self.test_job = AnsibleJob(self.executor_server, build_request, params)
self.test_job.jobdir = JobDir(self.executor_server.jobdir_root,
self.executor_server.keep_jobdir,
str(build_request.uuid))
def test_getHostList_host_keys(self):
def test_prepareNodes_host_keys(self):
# Test without connection_port set
node = {'name': 'fake-host',
'label': 'fake-label',
'state': 'ready',
'cloud': 'fake',
'host_keys': ['fake-host-key'],
'interface_ip': 'localhost'}
keys = self.test_job.getHostList({'nodes': [node],
'host_vars': {},
'vars': {},
'groups': [],
})[0]['host_keys']
nodeset = {
"name": "dummy-node",
"node_request_id": 0,
"nodes": [node],
"groups": [],
}
self.test_job.nodeset = NodeSet.fromDict(nodeset)
self.test_job.prepareNodes({'host_vars': {},
'vars': {},
'groups': [],
})
keys = self.test_job.host_list[0]['host_keys']
self.assertEqual(keys[0], 'localhost fake-host-key')
# Test with custom connection_port set
node['connection_port'] = 22022
keys = self.test_job.getHostList({'nodes': [node],
'host_vars': {},
'vars': {},
'groups': [],
})[0]['host_keys']
self.test_job.nodeset = NodeSet.fromDict(nodeset)
self.test_job.prepareNodes({'host_vars': {},
'vars': {},
'groups': [],
})
keys = self.test_job.host_list[0]['host_keys']
self.assertEqual(keys[0], '[localhost]:22022 fake-host-key')
# Test with no host keys
node['host_keys'] = []
host = self.test_job.getHostList({'nodes': [node],
'host_vars': {},
'vars': {},
'groups': [],
})[0]
self.test_job.nodeset = NodeSet.fromDict(nodeset)
self.test_job.prepareNodes({'nodes': [node],
'host_vars': {},
'vars': {},
'groups': [],
})
host = self.test_job.host_list[0]
self.assertEqual(host['host_keys'], [])
self.assertEqual(
host['host_vars']['ansible_ssh_common_args'],
'-o StrictHostKeyChecking=false')
def test_getHostList_shell_type(self):
def test_prepareNodes_shell_type(self):
# Test without shell type set
node = {'name': 'fake-host',
'label': 'fake-label',
'state': 'ready',
'cloud': 'fake',
'host_keys': ['fake-host-key'],
'interface_ip': 'localhost'}
host = self.test_job.getHostList({'nodes': [node],
'host_vars': {},
'vars': {},
'groups': [],
})[0]
nodeset = {
"name": "dummy-node",
"node_request_id": 0,
"nodes": [node],
"groups": [],
}
self.test_job.nodeset = NodeSet.fromDict(nodeset)
self.test_job.prepareNodes({'host_vars': {},
'vars': {},
'groups': [],
})
host = self.test_job.host_list[0]
self.assertNotIn('ansible_shell_type', host['host_vars'])
# Test with custom shell type set.
node['shell_type'] = 'cmd'
host = self.test_job.getHostList({'nodes': [node],
'host_vars': {},
'vars': {},
'groups': [],
})[0]
self.test_job.nodeset = NodeSet.fromDict(nodeset)
self.test_job.prepareNodes({'host_vars': {},
'vars': {},
'groups': [],
})
host = self.test_job.host_list[0]
self.assertIn('ansible_shell_type', host['host_vars'])
self.assertEqual(
host['host_vars']['ansible_shell_type'],
@ -978,6 +1005,7 @@ class TestExecutorExtraPackages(AnsibleZuulTestCase):
class TestVarSquash(BaseTestCase):
def test_squash_variables(self):
# Test that we correctly squash job variables
nodeset = NodeSet()
nodes = [
{'name': 'node1', 'host_vars': {
'host': 'node1_host',
@ -988,10 +1016,8 @@ class TestVarSquash(BaseTestCase):
'extra': 'node2_extra',
}},
]
groups = [
{'name': 'group1', 'nodes': ['node1']},
{'name': 'group2', 'nodes': ['node2']},
]
nodeset.addGroup(Group('group1', ['node1']))
nodeset.addGroup(Group('group2', ['node2']))
groupvars = {
'group1': {
'host': 'group1_host',
@ -1017,7 +1043,7 @@ class TestVarSquash(BaseTestCase):
'extra': 'extravar_extra',
}
out = squash_variables(
nodes, groups, jobvars, groupvars, extravars)
nodes, nodeset, jobvars, groupvars, extravars)
expected = {
'node1': {

11
tests/unit/test_web.py

@ -1026,6 +1026,17 @@ class TestWeb(BaseTestWeb):
'name': ['controller'],
'state': 'unknown'
}],
'nodeset': {
'groups': [],
'name': '',
'nodes': [
{'aliases': [],
'comment': None,
'hold_job': None,
'id': None,
'label': 'label1',
'name': 'controller',
'state': 'unknown'}]},
'override_branch': None,
'override_checkout': None,
'repo_state': {},

3
zuul/executor/client.py

@ -59,7 +59,7 @@ class ExecutorClient(object):
"with dependent changes %s",
job, uuid, nodeset, item.change, dependent_changes)
params = zuul.executor.common.construct_gearman_params(
params = zuul.executor.common.construct_build_params(
uuid, self.sched, nodeset,
job, item, pipeline, dependent_changes, merger_items,
redact_secrets_and_keys=False)
@ -102,7 +102,6 @@ class ExecutorClient(object):
# Store the NodeRequest ID in the job arguments, so we can look it up
# on the executor side to lock the nodes.
params["nodeset"] = nodeset.toDict()
node_request = build.build_set.getJobNodeRequest(job.name)
if node_request:
params["noderequest_id"] = node_request.id

11
zuul/executor/common.py

@ -17,12 +17,12 @@ import os
from zuul.lib import strings
def construct_gearman_params(uuid, sched, nodeset, job, item, pipeline,
dependent_changes=[], merger_items=[],
redact_secrets_and_keys=True):
def construct_build_params(uuid, sched, nodeset, job, item, pipeline,
dependent_changes=[], merger_items=[],
redact_secrets_and_keys=True):
"""Returns a list of all the parameters needed to build a job.
These parameters may be passed to zuul-executors (via gearman) to perform
These parameters may be passed to zuul-executors (via ZK) to perform
the job itself.
Alternatively they contain enough information to load into another build
@ -124,6 +124,8 @@ def construct_gearman_params(uuid, sched, nodeset, job, item, pipeline,
params['cleanup_playbooks'] = [make_playbook(x)
for x in job.cleanup_run]
# TODO(corvus): Remove nodes and groups since they're included in
# nodeset
nodes = []
for node in nodeset.getNodes():
n = node.toDict()
@ -131,6 +133,7 @@ def construct_gearman_params(uuid, sched, nodeset, job, item, pipeline,
nodes.append(n)
params['nodes'] = nodes
params['groups'] = [group.toDict() for group in nodeset.getGroups()]
params["nodeset"] = nodeset.toDict()
params['ssh_keys'] = []
if pipeline.post_review:
if redact_secrets_and_keys:

130
zuul/executor/server.py

@ -753,8 +753,7 @@ class DeduplicateQueue(object):
self.condition.release()
def squash_variables(nodes, groups, jobvars, groupvars,
extravars):
def squash_variables(nodes, nodeset, jobvars, groupvars, extravars):
"""Combine the Zuul job variable parameters into a hostvars dictionary.
This is used by the executor when freezing job variables. It
@ -763,10 +762,9 @@ def squash_variables(nodes, groups, jobvars, groupvars,
therefore group vars and extra vars can be combined in such a way
to present a single hierarchy of variables visible to each host).
:param list nodes: A list of node dictionaries (as supplied by
the executor client)
:param dict groups: A list of group dictionaries (as supplied by
the executor client)
:param list nodes: A list of node dictionaries (as returned by
getHostList)
:param Nodeset nodeset: A nodeset (used for group membership).
:param dict jobvars: A dictionary corresponding to Zuul's job.vars.
:param dict groupvars: A dictionary keyed by group name with a value of
a dictionary of variables for that group.
@ -781,18 +779,18 @@ def squash_variables(nodes, groups, jobvars, groupvars,
# Zuul runs ansible with the default hash behavior of 'replace';
# this means we don't need to deep-merge dictionaries.
groups = sorted(nodeset.getGroups(), key=lambda g: g.name)
for node in nodes:
hostname = node['name']
ret[hostname] = {}
# group 'all'
ret[hostname].update(jobvars)
# group vars
groups = sorted(groups, key=lambda g: g['name'])
if 'all' in groupvars:
ret[hostname].update(groupvars.get('all', {}))
for group in groups:
if hostname in group['nodes']:
ret[hostname].update(groupvars.get(group['name'], {}))
if hostname in group.nodes:
ret[hostname].update(groupvars.get(group.name, {}))
# host vars
ret[hostname].update(node['host_vars'])
# extra vars
@ -818,16 +816,16 @@ def make_setup_inventory_dict(nodes, hostvars):
return inventory
def is_group_var_set(name, host, args):
for group in args['groups']:
if host in group['nodes']:
group_vars = args['group_vars'].get(group['name'], {})
def is_group_var_set(name, host, nodeset, args):
for group in nodeset.getGroups():
if host in group.nodes:
group_vars = args['group_vars'].get(group.name, {})
if name in group_vars:
return True
return False
def make_inventory_dict(nodes, groups, hostvars, remove_keys=None):
def make_inventory_dict(nodes, nodeset, hostvars, remove_keys=None):
hosts = {}
for node in nodes:
node_hostvars = hostvars[node['name']].copy()
@ -851,16 +849,16 @@ def make_inventory_dict(nodes, groups, hostvars, remove_keys=None):
}
}
for group in groups:
for group in nodeset.getGroups():
if 'children' not in inventory['all']:
inventory['all']['children'] = dict()
group_hosts = {}
for node_name in group['nodes']:
for node_name in group.nodes:
group_hosts[node_name] = None
inventory['all']['children'].update({
group['name']: {
group.name: {
'hosts': group_hosts,
}})
@ -1759,9 +1757,9 @@ class AnsibleJob(object):
"Could not decode json from {logfile}".format(
logfile=json_output))
def getHostList(self, args):
def getHostList(self, args, nodes):
hosts = []
for node in args['nodes']:
for node in nodes:
# NOTE(mordred): This assumes that the nodepool launcher
# and the zuul executor both have similar network
# characteristics, as the launcher will do a test for ipv6
@ -1770,9 +1768,9 @@ class AnsibleJob(object):
# set to True in the clouds.yaml for a cloud if this
# results in the wrong thing being in interface_ip
# TODO(jeblair): Move this notice to the docs.
for name in node['name']:
ip = node.get('interface_ip')
port = node.get('connection_port', node.get('ssh_port', 22))
for name in node.name:
ip = node.interface_ip
port = node.connection_port
host_vars = args['host_vars'].get(name, {}).copy()
check_varnames(host_vars)
host_vars.update(dict(
@ -1780,18 +1778,18 @@ class AnsibleJob(object):
ansible_user=self.executor_server.default_username,
ansible_port=port,
nodepool=dict(
label=node.get('label'),
az=node.get('az'),
cloud=node.get('cloud'),
provider=node.get('provider'),
region=node.get('region'),
host_id=node.get('host_id'),
external_id=node.get('external_id'),
interface_ip=node.get('interface_ip'),
public_ipv4=node.get('public_ipv4'),
private_ipv4=node.get('private_ipv4'),
public_ipv6=node.get('public_ipv6'),
private_ipv6=node.get('private_ipv6'))))
label=node.label,
az=node.az,
cloud=node.cloud,
provider=node.provider,
region=node.region,
host_id=node.host_id,
external_id=getattr(node, 'external_id', None),
interface_ip=node.interface_ip,
public_ipv4=node.public_ipv4,
private_ipv4=node.private_ipv4,
public_ipv6=node.public_ipv6,
private_ipv6=node.private_ipv6)))
# Ansible >=2.8 introduced "auto" as an
# ansible_python_interpreter argument that looks up
@ -1804,15 +1802,15 @@ class AnsibleJob(object):
# user control.
api = 'ansible_python_interpreter'
if (api not in args['vars'] and
not is_group_var_set(api, name, args)):
python = node.get('python_path', 'auto')
not is_group_var_set(api, name, self.nodeset, args)):
python = getattr(node, 'python_path', 'auto')
host_vars.setdefault(api, python)
username = node.get('username')
username = node.username
if username:
host_vars['ansible_user'] = username
connection_type = node.get('connection_type')
connection_type = node.connection_type
if connection_type:
host_vars['ansible_connection'] = connection_type
if connection_type == "winrm":
@ -1835,19 +1833,19 @@ class AnsibleJob(object):
self.winrm_read_timeout
elif connection_type == "kubectl":
host_vars['ansible_kubectl_context'] = \
node.get('kubectl_context')
getattr(node, 'kubectl_context', None)
shell_type = node.get('shell_type')
shell_type = getattr(node, 'shell_type', None)
if shell_type:
host_vars['ansible_shell_type'] = shell_type
host_keys = []
for key in node.get('host_keys', []):
for key in getattr(node, 'host_keys', []):
if port != 22:
host_keys.append("[%s]:%s %s" % (ip, port, key))
else:
host_keys.append("%s %s" % (ip, key))
if not node.get('host_keys'):
if not getattr(node, 'host_keys', None):
host_vars['ansible_ssh_common_args'] = \
'-o StrictHostKeyChecking=false'
@ -2295,32 +2293,33 @@ class AnsibleJob(object):
def prepareNodes(self, args):
# Returns the zuul.resources ansible variable for later user
# Used to remove resource nodes from the inventory
resources_nodes = []
# The (non-resource) nodes we want to keep in the inventory
inventory_nodes = []
# The zuul.resources ansible variable
zuul_resources = {}
for node in args['nodes']:
if node.get('connection_type') in (
for node in self.nodeset.getNodes():
if node.connection_type in (
'namespace', 'project', 'kubectl'):
# TODO: decrypt resource data using scheduler key
data = node['connection_port']
data = node.connection_port
# Setup kube/config file
self.prepareKubeConfig(self.jobdir, data)
# Convert connection_port in kubectl connection parameters
node['connection_port'] = None
node['kubectl_namespace'] = data['namespace']
node['kubectl_context'] = data['context_name']
node.connection_port = None
node.kubectl_namespace = data['namespace']
node.kubectl_context = data['context_name']
# Add node information to zuul.resources
zuul_resources[node['name'][0]] = {
zuul_resources[node.name[0]] = {
'namespace': data['namespace'],
'context': data['context_name'],
}
if node['connection_type'] in ('project', 'namespace'):
if node.connection_type in ('project', 'namespace'):
# Project are special nodes that are not the inventory
resources_nodes.append(node)
pass
else:
inventory_nodes.append(node)
# Add the real pod name to the resources_var
zuul_resources[node['name'][0]]['pod'] = data['pod']
zuul_resources[node.name[0]]['pod'] = data['pod']
fwd = KubeFwd(zuul_event_id=self.zuul_event_id,
build=self.build_request.uuid,
@ -2331,18 +2330,17 @@ class AnsibleJob(object):
try:
fwd.start()
self.port_forwards.append(fwd)
zuul_resources[node['name'][0]]['stream_port'] = \
zuul_resources[node.name[0]]['stream_port'] = \
fwd.port
except Exception:
self.log.exception("Unable to start port forward:")
self.log.error("Kubectl and socat are required for "
"streaming logs")
else:
# A normal node to include in inventory
inventory_nodes.append(node)
# Remove resource node from nodes list
for node in resources_nodes:
args['nodes'].remove(node)
self.host_list = self.getHostList(args)
self.host_list = self.getHostList(args, inventory_nodes)
with open(self.jobdir.known_hosts, 'w') as known_hosts:
for node in self.host_list:
@ -2356,8 +2354,8 @@ class AnsibleJob(object):
# Check the group and extra var names for safety; they'll get
# merged later
for group in args['groups']:
group_vars = args['group_vars'].get(group['name'], {})
for group in self.nodeset.getGroups():
group_vars = args['group_vars'].get(group.name, {})
check_varnames(group_vars)
check_varnames(args['extra_vars'])
@ -2390,7 +2388,7 @@ class AnsibleJob(object):
}
host_list = self.host_list + [localhost]
self.original_hostvars = squash_variables(
host_list, args['groups'], all_vars,
host_list, self.nodeset, all_vars,
args['group_vars'], args['extra_vars'])
def loadFrozenHostvars(self):
@ -2426,9 +2424,8 @@ class AnsibleJob(object):
def writeDebugInventory(self):
# This file is unused by Zuul, but the base jobs copy it to logs
# for debugging, so let's continue to put something there.
args = self.arguments
inventory = make_inventory_dict(
self.host_list, args['groups'], self.original_hostvars)
self.host_list, self.nodeset, self.original_hostvars)
inventory['all']['vars']['zuul'] = self.zuul_vars
with open(self.jobdir.inventory, 'w') as inventory_yaml:
@ -2449,9 +2446,8 @@ class AnsibleJob(object):
default_flow_style=False))
def writeInventory(self, jobdir_playbook, hostvars):
args = self.arguments
inventory = make_inventory_dict(
self.host_list, args['groups'], hostvars,
self.host_list, self.nodeset, hostvars,
remove_keys=jobdir_playbook.secrets_keys)
with open(jobdir_playbook.inventory, 'w') as inventory_yaml:

5
zuul/model.py

@ -658,7 +658,7 @@ class Node(ConfigObject):
self._state = data['state']
keys = []
for k, v in data.items():
if k == 'state':
if k in ['state', 'name', 'aliases']:
continue
keys.append(k)
setattr(self, k, v)
@ -666,7 +666,8 @@ class Node(ConfigObject):
@classmethod
def fromDict(cls, data):
node = cls(data["name"], data["label"])
aliases = data.get('aliases', [])
node = cls([data["name"]] + aliases, data["label"])
node.updateFromDict(data)
return node

2
zuul/rpclistener.py

@ -509,7 +509,7 @@ class RPCListener(RPCListenerBase):
nodeset = job.nodeset
job.setBase(tenant.layout)
uuid = '0' * 32
params = zuul.executor.common.construct_gearman_params(
params = zuul.executor.common.construct_build_params(
uuid, self.sched, nodeset,
job, item, pipeline)
gear_job.sendWorkComplete(json.dumps(params, cls=ZuulJSONEncoder))

Loading…
Cancel
Save