#!/usr/bin/env python # Copyright (C) 2012 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # Manage jobs in Jenkins server import os import hashlib import yaml import json import xml.etree.ElementTree as XML from xml.dom import minidom import jenkins import re import pkg_resources import logging import copy import itertools import fnmatch from jenkins_jobs.errors import JenkinsJobsException logger = logging.getLogger(__name__) MAGIC_MANAGE_STRING = "" def deep_format(obj, paramdict): """Apply the paramdict via str.format() to all string objects found within the supplied obj. Lists and dicts are traversed recursively.""" # YAML serialisation was originally used to achieve this, but that places # limitations on the values in paramdict - the post-format result must # still be valid YAML (so substituting-in a string containing quotes, for # example, is problematic). if isinstance(obj, str): try: ret = obj.format(**paramdict) except KeyError as exc: missing_key = exc.message desc = "%s parameter missing to format %s\nGiven: %s" % ( missing_key, obj, paramdict) raise JenkinsJobsException(desc) elif isinstance(obj, list): ret = [] for item in obj: ret.append(deep_format(item, paramdict)) elif isinstance(obj, dict): ret = {} for item in obj: ret[item.format(**paramdict)] = deep_format(obj[item], paramdict) else: ret = obj return ret def matches(what, where): """ Checks if the given string matches against the given list of glob patterns :arg str what: String that we want to test if matches :arg list where: list of glob patters to match """ for pattern in where: if re.match(fnmatch.translate(pattern), what): return True return False class YamlParser(object): def __init__(self, config=None): self.registry = ModuleRegistry(config) self.data = {} self.jobs = [] def parse(self, fn): data = yaml.load(open(fn)) if data: for item in data: cls, dfn = item.items()[0] group = self.data.get(cls, {}) if len(item.items()) > 1: n = None for k, v in item.items(): if k == "name": n = v break # Syntax error raise JenkinsJobsException("Syntax error, for item " "named '{0}'. Missing indent?" .format(n)) name = dfn['name'] group[name] = dfn self.data[cls] = group def getJob(self, name): job = self.data.get('job', {}).get(name, None) if not job: return job return self.applyDefaults(job) def getJobGroup(self, name): return self.data.get('job-group', {}).get(name, None) def getJobTemplate(self, name): job = self.data.get('job-template', {}).get(name, None) if not job: return job return self.applyDefaults(job) def applyDefaults(self, data): whichdefaults = data.get('defaults', 'global') defaults = self.data.get('defaults', {}).get(whichdefaults, {}) newdata = {} newdata.update(defaults) newdata.update(data) return newdata def generateXML(self, jobs_filter=None): changed = True while changed: changed = False for module in self.registry.modules: if hasattr(module, 'handle_data'): if module.handle_data(self): changed = True for job in self.data.get('job', {}).values(): if jobs_filter and not matches(job['name'], jobs_filter): logger.debug("Ignoring job {0}".format(job['name'])) continue logger.debug("XMLifying job '{0}'".format(job['name'])) job = self.applyDefaults(job) self.getXMLForJob(job) for project in self.data.get('project', {}).values(): logger.debug("XMLifying project '{0}'".format(project['name'])) for jobspec in project.get('jobs', []): if isinstance(jobspec, dict): # Singleton dict containing dict of job-specific params jobname, jobparams = jobspec.items()[0] if not isinstance(jobparams, dict): jobparams = {} else: jobname = jobspec jobparams = {} job = self.getJob(jobname) if job: # Just naming an existing defined job continue # see if it's a job group group = self.getJobGroup(jobname) if group: for group_jobspec in group['jobs']: if isinstance(group_jobspec, dict): group_jobname, group_jobparams = \ group_jobspec.items()[0] if not isinstance(group_jobparams, dict): group_jobparams = {} else: group_jobname = group_jobspec group_jobparams = {} job = self.getJob(group_jobname) if job: continue template = self.getJobTemplate(group_jobname) # Allow a group to override parameters set by a project d = {} d.update(project) d.update(jobparams) d.update(group) d.update(group_jobparams) # Except name, since the group's name is not useful d['name'] = project['name'] if template: self.getXMLForTemplateJob(d, template, jobs_filter) continue # see if it's a template template = self.getJobTemplate(jobname) if template: d = {} d.update(project) d.update(jobparams) self.getXMLForTemplateJob(d, template, jobs_filter) else: raise JenkinsJobsException("Failed to find suitable " "template named '{0}'" .format(jobname)) def getXMLForTemplateJob(self, project, template, jobs_filter=None): dimensions = [] for (k, v) in project.items(): if type(v) == list and k not in ['jobs']: dimensions.append(zip([k] * len(v), v)) # XXX somewhat hackish to ensure we actually have a single # pass through the loop if len(dimensions) == 0: dimensions = [(("", ""),)] checksums = set([]) for values in itertools.product(*dimensions): params = copy.deepcopy(project) params.update(values) expanded = deep_format(template, params) # Keep track of the resulting expansions to avoid # regenerating the exact same job. Whenever a project has # different values for a parameter and that parameter is not # used in the template, we ended up regenerating the exact # same job. # To achieve that we serialize the expanded template making # sure the dict keys are always in the same order. Then we # record the checksum in an unordered unique set which let # us guarantee a group of parameters will not be added a # second time. uniq = json.dumps(expanded, sort_keys=True) checksum = hashlib.md5(uniq).hexdigest() # Lookup the checksum if checksum not in checksums: # We also want to skip XML generation whenever the user did # not ask for that job. job_name = expanded.get('name') if jobs_filter and not matches(job_name, jobs_filter): continue logger.debug("Generating XML for template job {0}" " (params {1})".format( template['name'], params)) self.getXMLForJob(expanded) checksums.add(checksum) def getXMLForJob(self, data): kind = data.get('project-type', 'freestyle') data["description"] = data.get("description", "") + \ self.get_managed_string() for ep in pkg_resources.iter_entry_points( group='jenkins_jobs.projects', name=kind): Mod = ep.load() mod = Mod(self.registry) xml = mod.root_xml(data) self.gen_xml(xml, data) job = XmlJob(xml, data['name']) self.jobs.append(job) break def gen_xml(self, xml, data): for module in self.registry.modules: if hasattr(module, 'gen_xml'): module.gen_xml(self, xml, data) def get_managed_string(self): # The \n\n is not hard coded, because they get stripped if the # project does not otherwise have a description. return "\n\n" + MAGIC_MANAGE_STRING class ModuleRegistry(object): entry_points_cache = {} def __init__(self, config): self.modules = [] self.modules_by_component_type = {} self.handlers = {} self.global_config = config for entrypoint in pkg_resources.iter_entry_points( group='jenkins_jobs.modules'): Mod = entrypoint.load() mod = Mod(self) self.modules.append(mod) self.modules.sort(lambda a, b: cmp(a.sequence, b.sequence)) if mod.component_type is not None: self.modules_by_component_type[mod.component_type] = mod def registerHandler(self, category, name, method): cat_dict = self.handlers.get(category, {}) if not cat_dict: self.handlers[category] = cat_dict cat_dict[name] = method def getHandler(self, category, name): return self.handlers[category][name] def dispatch(self, component_type, parser, xml_parent, component, template_data={}): """This is a method that you can call from your implementation of Base.gen_xml or component. It allows modules to define a type of component, and benefit from extensibility via Python entry points and Jenkins Job Builder :ref:`Macros `. :arg string component_type: the name of the component (e.g., `builder`) :arg YAMLParser parser: the global YMAL Parser :arg Element xml_parent: the parent XML element :arg dict template_data: values that should be interpolated into the component definition See :py:class:`jenkins_jobs.modules.base.Base` for how to register components of a module. See the Publishers module for a simple example of how to use this method. """ if component_type not in self.modules_by_component_type: raise JenkinsJobsException("Unknown component type: " "'{0}'.".format(component_type)) component_list_type = self.modules_by_component_type[component_type] \ .component_list_type if isinstance(component, dict): # The component is a sigleton dictionary of name: dict(args) name, component_data = component.items()[0] if template_data: # Template data contains values that should be interpolated # into the component definition s = yaml.dump(component_data, default_flow_style=False) s = s.format(**template_data) component_data = yaml.load(s) else: # The component is a simple string name, eg "run-tests" name = component component_data = {} # Look for a component function defined in an entry point cache_key = '%s:%s' % (component_list_type, name) eps = ModuleRegistry.entry_points_cache.get(cache_key) if eps is None: eps = list(pkg_resources.iter_entry_points( group='jenkins_jobs.{0}'.format(component_list_type), name=name)) if len(eps) > 1: raise JenkinsJobsException( "Duplicate entry point found for component type: '{0}'," "name: '{1}'".format(component_type, name)) elif len(eps) == 1: ModuleRegistry.entry_points_cache[cache_key] = eps logger.debug("Cached entry point %s = %s", cache_key, ModuleRegistry.entry_points_cache[cache_key]) if len(eps) == 1: func = eps[0].load() func(parser, xml_parent, component_data) else: # Otherwise, see if it's defined as a macro component = parser.data.get(component_type, {}).get(name) if component: for b in component[component_list_type]: # Pass component_data in as template data to this function # so that if the macro is invoked with arguments, # the arguments are interpolated into the real defn. self.dispatch(component_type, parser, xml_parent, b, component_data) else: raise JenkinsJobsException("Unknown entry point or macro '{0}'" " for component type: '{1}'.". format(name, component_type)) class XmlJob(object): def __init__(self, xml, name): self.xml = xml self.name = name def md5(self): return hashlib.md5(self.output()).hexdigest() # Pretty printing ideas from # http://stackoverflow.com/questions/749796/pretty-printing-xml-in-python pretty_text_re = re.compile('>\n\s+([^<>\s].*?)\n\s+\g<1>