fb913725e2
Also fix error message for missing defaults. Change-Id: I5d912c2bfb607c8388010ac6e39780be93073f5a
357 lines
13 KiB
Python
357 lines
13 KiB
Python
#!/usr/bin/env python
|
|
# Copyright (C) 2015 OpenStack, LLC.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Manage Jenkins plugin module registry.
|
|
|
|
import inspect
|
|
import logging
|
|
import operator
|
|
import pkg_resources
|
|
import sys
|
|
import types
|
|
from pkg_resources.extern.packaging.version import InvalidVersion
|
|
|
|
from six import PY2
|
|
|
|
from jenkins.plugins import PluginVersion
|
|
from jenkins_jobs.errors import JenkinsJobsException
|
|
|
|
__all__ = ["ModuleRegistry"]
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
getargspec = inspect.getargspec if PY2 else inspect.getfullargspec
|
|
|
|
|
|
class ModuleRegistry(object):
|
|
_entry_points_cache = {}
|
|
_component_type_cache = {}
|
|
|
|
def __init__(self, jjb_config, plugins_list=None):
|
|
self.modules = []
|
|
self.modules_by_component_type = {}
|
|
self.handlers = {}
|
|
self.jjb_config = jjb_config
|
|
self.masked_warned = {}
|
|
self._macros = {}
|
|
|
|
if plugins_list is None:
|
|
self._plugin_version = {}
|
|
else:
|
|
# PluginVersion by short and long plugin name.
|
|
self._plugin_version = self._get_plugins_versions(plugins_list)
|
|
|
|
for entrypoint in pkg_resources.iter_entry_points(group="jenkins_jobs.modules"):
|
|
Mod = entrypoint.load()
|
|
mod = Mod(self)
|
|
self.modules.append(mod)
|
|
self.modules.sort(key=operator.attrgetter("sequence"))
|
|
if mod.component_type is not None:
|
|
self.modules_by_component_type[mod.component_type] = entrypoint
|
|
|
|
@staticmethod
|
|
def _get_plugins_versions(plugins_list):
|
|
plugin_version = {}
|
|
|
|
for plugin_info in plugins_list:
|
|
short_name = plugin_info.get("shortName")
|
|
long_name = plugin_info.get("longName")
|
|
|
|
version = plugin_info["version"]
|
|
if version == None: # noqa: E711; should call PluginInfo.__eq__.
|
|
# Ensure that plugin_info always has version and it is instance of PluginVersion.
|
|
plugin_info["version"] = str(sys.maxsize)
|
|
version = plugin_info["version"]
|
|
|
|
try:
|
|
pkg_resources.parse_version(version)
|
|
except InvalidVersion:
|
|
plugin_name = short_name or long_name
|
|
if plugin_name:
|
|
logger.warning(
|
|
"Version %s for plugin %s does not conform to PEP440",
|
|
version,
|
|
plugin_name,
|
|
)
|
|
else:
|
|
logger.warning("Version %s does not conform to PEP440", version)
|
|
|
|
if short_name:
|
|
plugin_version[short_name] = version
|
|
if long_name:
|
|
plugin_version[long_name] = version
|
|
|
|
return plugin_version
|
|
|
|
@staticmethod
|
|
def _filter_kwargs(func, **kwargs):
|
|
arg_spec = getargspec(func)
|
|
for name in list(kwargs.keys()):
|
|
if name not in arg_spec.args:
|
|
del kwargs[name]
|
|
return kwargs
|
|
|
|
def get_plugin_version(self, plugin_name, alt_plugin_name=None, default=None):
|
|
"""Provide plugin version to be used from a module's impl of Base.gen_xml.
|
|
|
|
The return value is a plugin version obtained directly from a running
|
|
Jenkins instance.
|
|
This allows module authors to differentiate generated XML output based
|
|
on it.
|
|
|
|
:arg str plugin_name: Either the shortName or longName of a plugin
|
|
as seen in a query that looks like:
|
|
``http://<jenkins-hostname>/pluginManager/api/json?pretty&depth=2``
|
|
:arg str alt_plugin_name: Alternative plugin name. Used if plugin_name
|
|
is missing in plugin list.
|
|
:arg str default: Default value. Used if plugin name is missing in
|
|
plugin list.
|
|
|
|
During a 'test' run, it is possible to override JJB's query to a live
|
|
Jenkins instance by passing it a path to a file containing a YAML list
|
|
of dictionaries that mimics the plugin properties you want your test
|
|
output to reflect::
|
|
|
|
jenkins-jobs test -p /path/to/plugins-info.yaml
|
|
|
|
Below is example YAML that might be included in
|
|
/path/to/plugins-info.yaml.
|
|
|
|
.. literalinclude:: /../../tests/cmd/fixtures/plugins-info.yaml
|
|
|
|
"""
|
|
try:
|
|
return self._plugin_version[plugin_name]
|
|
except KeyError:
|
|
pass
|
|
if alt_plugin_name:
|
|
try:
|
|
return self._plugin_version[alt_plugin_name]
|
|
except KeyError:
|
|
pass
|
|
if default is not None:
|
|
return PluginVersion(default)
|
|
# Assume latest version of plugin is preferred config format.
|
|
return PluginVersion(str(sys.maxsize))
|
|
|
|
def registerHandler(self, category, name, method):
|
|
cat_dict = self.handlers.get(category, {})
|
|
if not cat_dict:
|
|
self.handlers[category] = cat_dict
|
|
cat_dict[name] = method
|
|
|
|
def getHandler(self, category, name):
|
|
return self.handlers[category][name]
|
|
|
|
@property
|
|
def macros(self):
|
|
return self._macros
|
|
|
|
def set_macros(self, macros):
|
|
self._macros = macros
|
|
|
|
def amend_job_dicts(self, job_data_list):
|
|
while True:
|
|
changed = False
|
|
for job in job_data_list:
|
|
for module in self.modules:
|
|
if module.amend_job_dict(job.data):
|
|
changed = True
|
|
if not changed:
|
|
break
|
|
|
|
def get_component_list_type(self, entry_point):
|
|
if entry_point in self._component_type_cache:
|
|
return self._component_type_cache[entry_point]
|
|
|
|
# pkg_resources.EntryPoint.load() is costly, cache it.
|
|
component_list_type = entry_point.load().component_list_type
|
|
logging.info("Caching type %s of %s", component_list_type, entry_point)
|
|
self._component_type_cache[entry_point] = component_list_type
|
|
|
|
return component_list_type
|
|
|
|
def dispatch(
|
|
self,
|
|
component_type,
|
|
xml_parent,
|
|
component,
|
|
template_data={},
|
|
job_data=None,
|
|
component_pos=None,
|
|
):
|
|
"""This is a method that you can call from your implementation of
|
|
Base.gen_xml or component. It allows modules to define a type
|
|
of component, and benefit from extensibility via Python
|
|
entry points and Jenkins Job Builder :ref:`Macros <macro>`.
|
|
|
|
:arg str component_type: the name of the component
|
|
(e.g., `builder`)
|
|
:arg xml_parent: the parent XML element
|
|
:arg component: component definition
|
|
:arg dict template_data: values that should be interpolated into
|
|
the component definition
|
|
:arg dict job_data: full job definition
|
|
|
|
See :py:class:`jenkins_jobs.modules.base.Base` for how to register
|
|
components of a module.
|
|
|
|
See the Publishers module for a simple example of how to use
|
|
this method.
|
|
"""
|
|
|
|
if component_type not in self.modules_by_component_type:
|
|
raise JenkinsJobsException(
|
|
"Unknown component type: " "'{0}'.".format(component_type)
|
|
)
|
|
|
|
entry_point = self.modules_by_component_type[component_type]
|
|
component_list_type = self.get_component_list_type(entry_point)
|
|
|
|
if isinstance(component, dict):
|
|
# The component is a singleton dictionary of name: dict(args)
|
|
name, component_data = next(iter(component.items()))
|
|
if template_data:
|
|
paramdict = {}
|
|
paramdict.update(template_data)
|
|
paramdict.update(job_data or {})
|
|
else:
|
|
# The component is a simple string name, eg "run-tests"
|
|
name = component
|
|
component_data = {}
|
|
|
|
# Look for a component function defined in an entry point
|
|
eps = self._entry_points_cache.get(component_list_type)
|
|
if eps is None:
|
|
eps = self._load_eps(component_list_type, component_type, entry_point, name)
|
|
|
|
macro_dict = self.macros.get(component_type, {})
|
|
macro = macro_dict.get(name)
|
|
if macro:
|
|
try:
|
|
self._dispatch_macro(
|
|
component_data,
|
|
component_type,
|
|
eps,
|
|
job_data,
|
|
macro,
|
|
name,
|
|
xml_parent,
|
|
)
|
|
except JenkinsJobsException as x:
|
|
if component_pos is not None:
|
|
raise x.with_context(
|
|
f"While expanding {component_type} macro call {name!r}",
|
|
pos=component_pos,
|
|
)
|
|
else:
|
|
raise
|
|
elif name in eps:
|
|
try:
|
|
func = eps[name]
|
|
kwargs = self._filter_kwargs(func, job_data=job_data)
|
|
func(self, xml_parent, component_data, **kwargs)
|
|
except JenkinsJobsException as x:
|
|
raise x.with_context(
|
|
f"In {component_type} {name!r}",
|
|
pos=component.pos,
|
|
)
|
|
else:
|
|
raise JenkinsJobsException(
|
|
"Unknown entry point or macro '{0}' "
|
|
"for component type: '{1}'.".format(name, component_type)
|
|
)
|
|
|
|
def _dispatch_macro(
|
|
self, component_data, component_type, eps, job_data, macro, name, xml_parent
|
|
):
|
|
if name in eps and name not in self.masked_warned:
|
|
self.masked_warned[name] = True
|
|
logger.warning(
|
|
"You have a macro ('%s') defined for '%s' "
|
|
"component type that is masking an inbuilt "
|
|
"definition" % (name, component_type)
|
|
)
|
|
if component_data is None:
|
|
component_data = {}
|
|
params = {**component_data, **(job_data or {})}
|
|
macro.dispatch_elements(self, xml_parent, component_data, job_data, params)
|
|
|
|
def _load_eps(self, component_list_type, component_type, entry_point, name):
|
|
logging.debug("Caching entrypoints for %s" % component_list_type)
|
|
module_eps = []
|
|
# auto build entry points by inferring from base component_types
|
|
mod = pkg_resources.EntryPoint(
|
|
"__all__", entry_point.module_name, dist=entry_point.dist
|
|
)
|
|
Mod = mod.load()
|
|
func_eps = [
|
|
Mod.__dict__.get(a)
|
|
for a in dir(Mod)
|
|
if isinstance(Mod.__dict__.get(a), types.FunctionType)
|
|
]
|
|
for func_ep in func_eps:
|
|
try:
|
|
# extract entry point based on docstring
|
|
name_line = func_ep.__doc__.split("\n")
|
|
if not name_line[0].startswith("yaml:"):
|
|
logger.debug("Ignoring '%s' as an entry point" % name_line)
|
|
continue
|
|
ep_name = name_line[0].split(" ")[1]
|
|
except (AttributeError, IndexError):
|
|
# AttributeError by docstring not being defined as
|
|
# a string to have split called on it.
|
|
# IndexError raised by name_line not containing anything
|
|
# after the 'yaml:' string.
|
|
logger.debug(
|
|
"Not including func '%s' as an entry point" % func_ep.__name__
|
|
)
|
|
continue
|
|
|
|
module_eps.append(
|
|
pkg_resources.EntryPoint(
|
|
ep_name,
|
|
entry_point.module_name,
|
|
dist=entry_point.dist,
|
|
attrs=(func_ep.__name__,),
|
|
)
|
|
)
|
|
logger.debug(
|
|
"Adding auto EP '%s=%s:%s'"
|
|
% (ep_name, entry_point.module_name, func_ep.__name__)
|
|
)
|
|
# load from explicitly defined entry points
|
|
module_eps.extend(
|
|
list(
|
|
pkg_resources.iter_entry_points(
|
|
group="jenkins_jobs.{0}".format(component_list_type)
|
|
)
|
|
)
|
|
)
|
|
eps = {}
|
|
for module_ep in module_eps:
|
|
if module_ep.name in eps:
|
|
raise JenkinsJobsException(
|
|
"Duplicate entry point found for component type: "
|
|
"'{0}', '{0}',"
|
|
"name: '{1}'".format(component_type, name)
|
|
)
|
|
|
|
eps[module_ep.name] = module_ep.load()
|
|
# cache both sets of entry points
|
|
self._entry_points_cache[component_list_type] = eps
|
|
logger.debug("Cached entry point group %s = %s", component_list_type, eps)
|
|
return eps
|