Auto-generated output from python-black

Please review the following patch containing the code changes in
the repo. This patch is a transition patch and is the auto-generated
output of the python-black tool.

Change-Id: I2d2de71da8a105fb62b561899ae78441ddab4032
Signed-off-by: Thanh Ha <zxiiro@gmail.com>
This commit is contained in:
Thanh Ha 2019-08-23 09:14:39 -04:00 committed by Sorin Sbarnea
parent ead185134d
commit 4d90c187a9
89 changed files with 10446 additions and 9903 deletions

View File

@ -18,8 +18,8 @@ from jenkins_jobs.version import version_info as jenkins_jobs_version
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
sys.path.insert(0, os.path.abspath('../..'))
sys.path.insert(0, os.path.abspath('../../jenkins_jobs/modules'))
sys.path.insert(0, os.path.abspath("../.."))
sys.path.insert(0, os.path.abspath("../../jenkins_jobs/modules"))
# -- General configuration ----------------------------------------------------
@ -28,25 +28,30 @@ sys.path.insert(0, os.path.abspath('../../jenkins_jobs/modules'))
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage',
'jenkins_jobs.sphinx.yaml', 'sphinxcontrib.programoutput',
'sphinx.ext.extlinks', 'sphinx.ext.doctest']
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.coverage",
"jenkins_jobs.sphinx.yaml",
"sphinxcontrib.programoutput",
"sphinx.ext.extlinks",
"sphinx.ext.doctest",
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The suffix of source filenames.
source_suffix = '.rst'
source_suffix = ".rst"
# The encoding of source files.
# source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
master_doc = "index"
# General information about the project.
project = u'Jenkins Job Builder'
copyright = u'2012, Jenkins Job Builder Maintainers'
project = u"Jenkins Job Builder"
copyright = u"2012, Jenkins Job Builder Maintainers"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@ -86,7 +91,7 @@ exclude_patterns = []
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
# modindex_common_prefix = []
@ -96,7 +101,7 @@ pygments_style = 'sphinx'
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
html_theme = "default"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
@ -169,7 +174,7 @@ html_theme = 'default'
# html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'JenkinsJobBuilderdoc'
htmlhelp_basename = "JenkinsJobBuilderdoc"
# -- Options for LaTeX output -------------------------------------------------
@ -177,10 +182,8 @@ htmlhelp_basename = 'JenkinsJobBuilderdoc'
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
# 'preamble': '',
}
@ -189,8 +192,13 @@ latex_elements = {
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
('index', 'JenkinsJobBuilder.tex', u'Jenkins Job Builder Documentation',
u'Jenkins Job Builder Maintainers', 'manual'),
(
"index",
"JenkinsJobBuilder.tex",
u"Jenkins Job Builder Documentation",
u"Jenkins Job Builder Maintainers",
"manual",
)
]
# The name of an image file (relative to this directory) to place at the top of
@ -225,8 +233,13 @@ linkcheck_timeout = 15
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'jenkins-jobs', u'Jenkins Job Builder Documentation',
[u'Jenkins Job Builder Maintainers'], 1)
(
"index",
"jenkins-jobs",
u"Jenkins Job Builder Documentation",
[u"Jenkins Job Builder Maintainers"],
1,
)
]
# If true, show URL addresses after external links.
@ -239,10 +252,15 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'JenkinsJobBuilder', u'Jenkins Job Builder Documentation',
u'Jenkins Job Builder Maintainers',
'JenkinsJobBuilder', 'One line description of project.',
'Miscellaneous'),
(
"index",
"JenkinsJobBuilder",
u"Jenkins Job Builder Documentation",
u"Jenkins Job Builder Maintainers",
"JenkinsJobBuilder",
"One line description of project.",
"Miscellaneous",
)
]
# Documents to append as an appendix to all manuals.
@ -254,6 +272,7 @@ texinfo_documents = [
# How to display URL addresses: 'footnote', 'no', or 'inline'.
# texinfo_show_urls = 'footnote'
extlinks = {'jenkins-wiki': ('https://wiki.jenkins.io/display/JENKINS/%s',
None),
'jenkins-plugins': ('https://plugins.jenkins.io/%s', None)}
extlinks = {
"jenkins-wiki": ("https://wiki.jenkins.io/display/JENKINS/%s", None),
"jenkins-plugins": ("https://plugins.jenkins.io/%s", None),
}

View File

@ -42,9 +42,9 @@ def getchunk(item):
# Subtract the matched portion from the original string
# if there was a match, otherwise set it to ""
item = (item[itemchunk.end():] if itemchunk else "")
item = item[itemchunk.end() :] if itemchunk else ""
# Don't return the match object, just the text
itemchunk = (itemchunk.group() if itemchunk else "")
itemchunk = itemchunk.group() if itemchunk else ""
return (itemchunk, item)
@ -54,28 +54,28 @@ def cmp(a, b):
def alphanum(a, b):
a = a.name if hasattr(a, 'name') else str(a)
b = b.name if hasattr(b, 'name') else str(b)
a = a.name if hasattr(a, "name") else str(a)
b = b.name if hasattr(b, "name") else str(b)
n = 0
while (n == 0):
while n == 0:
# Get a chunk and the original string with the chunk subtracted
(ac, a) = getchunk(a)
(bc, b) = getchunk(b)
# Both items contain only letters
if (re_letters.match(ac) and re_letters.match(bc)):
if re_letters.match(ac) and re_letters.match(bc):
n = cmp(ac, bc)
else:
# Both items contain only numbers
if (re_numbers.match(ac) and re_numbers.match(bc)):
if re_numbers.match(ac) and re_numbers.match(bc):
n = cmp(int(ac), int(bc))
# item has letters and one item has numbers, or one item is empty
else:
n = cmp(ac, bc)
# Prevent deadlocks
if (n == 0):
if n == 0:
n = 1
return n
@ -105,5 +105,5 @@ class AlphanumSort(object):
if __name__ == "__main__":
mylist = ['a2', 'a1', 'a10', 'a']
assert sorted(mylist, key=AlphanumSort) == ['a', 'a1', 'a2', 'a10']
mylist = ["a2", "a1", "a10", "a"]
assert sorted(mylist, key=AlphanumSort) == ["a", "a1", "a2", "a10"]

View File

@ -34,9 +34,7 @@ from jenkins_jobs.constants import MAGIC_MANAGE_STRING
from jenkins_jobs.parallel import concurrent
from jenkins_jobs import utils
__all__ = [
"JenkinsManager"
]
__all__ = ["JenkinsManager"]
logger = logging.getLogger(__name__)
@ -44,22 +42,22 @@ _DEFAULT_TIMEOUT = object()
class JenkinsManager(object):
def __init__(self, jjb_config):
url = jjb_config.jenkins['url']
user = jjb_config.jenkins['user']
password = jjb_config.jenkins['password']
timeout = jjb_config.jenkins['timeout']
url = jjb_config.jenkins["url"]
user = jjb_config.jenkins["user"]
password = jjb_config.jenkins["password"]
timeout = jjb_config.jenkins["timeout"]
if timeout != _DEFAULT_TIMEOUT:
self.jenkins = jenkins.Jenkins(url, user, password, timeout)
else:
self.jenkins = jenkins.Jenkins(url, user, password)
self.cache = JobCache(jjb_config.jenkins['url'],
flush=jjb_config.builder['flush_cache'])
self.cache = JobCache(
jjb_config.jenkins["url"], flush=jjb_config.builder["flush_cache"]
)
self._plugins_list = jjb_config.builder['plugins_info']
self._plugins_list = jjb_config.builder["plugins_info"]
self._jobs = None
self._job_list = None
self._views = None
@ -69,16 +67,15 @@ class JenkinsManager(object):
def _setup_output(self, output, item, config_xml=False):
output_dir = output
output_fn = os.path.join(output, item)
if '/' in item:
if "/" in item:
# in item folder
output_fn = os.path.join(output, os.path.normpath(item))
output_dir = os.path.dirname(output_fn)
# if in a folder, re-adding name to the directory here
if config_xml:
output_dir = os.path.join(
output_dir, os.path.basename(item))
output_fn = os.path.join(output_dir, 'config.xml')
output_dir = os.path.join(output_dir, os.path.basename(item))
output_fn = os.path.join(output_dir, "config.xml")
if output_dir != output:
logger.debug("Creating directory %s" % output_dir)
@ -102,36 +99,43 @@ class JenkinsManager(object):
def job_list(self):
if self._job_list is None:
# python-jenkins uses 'fullname' for folder/name combination
self._job_list = set(job['fullname'] for job in self.jobs)
self._job_list = set(job["fullname"] for job in self.jobs)
return self._job_list
def _job_format(self, job_name):
# returns job name or url based on config option
if self._jjb_config.builder['print_job_urls']:
return self._jjb_config.jenkins['url'] + \
'/job/' + quote(
'/job/'.join(job_name.split('/')).encode('utf8')) + '/'
if self._jjb_config.builder["print_job_urls"]:
return (
self._jjb_config.jenkins["url"]
+ "/job/"
+ quote("/job/".join(job_name.split("/")).encode("utf8"))
+ "/"
)
else:
return job_name
def _view_format(self, view_name):
# returns job name or url based on config option
if self._jjb_config.builder['print_job_urls']:
parts = view_name.split('/')
return self._jjb_config.jenkins['url'] + \
''.join(['/job/' + item for item in parts[:-1]]) + \
'/view/' + parts[-1] + '/'
if self._jjb_config.builder["print_job_urls"]:
parts = view_name.split("/")
return (
self._jjb_config.jenkins["url"]
+ "".join(["/job/" + item for item in parts[:-1]])
+ "/view/"
+ parts[-1]
+ "/"
)
else:
return view_name
def update_job(self, job_name, xml):
if self.is_job(job_name):
logger.info("Reconfiguring jenkins job {0}".format(
self._job_format(job_name)))
logger.info(
"Reconfiguring jenkins job {0}".format(self._job_format(job_name))
)
self.jenkins.reconfig_job(job_name, xml)
else:
logger.info("Creating jenkins job {0}".format(
self._job_format(job_name)))
logger.info("Creating jenkins job {0}".format(self._job_format(job_name)))
self.jenkins.create_job(job_name, xml)
def is_job(self, job_name, use_cache=True):
@ -143,7 +147,7 @@ class JenkinsManager(object):
def get_job_md5(self, job_name):
xml = self.jenkins.get_job_config(job_name)
return hashlib.md5(xml.encode('utf-8')).hexdigest()
return hashlib.md5(xml.encode("utf-8")).hexdigest()
def delete_job(self, job_name):
if self.is_job(job_name):
@ -162,10 +166,10 @@ class JenkinsManager(object):
logger.warning(
"Unable to retrieve Jenkins Plugin Info from {0},"
" using default empty plugins info list.".format(
self.jenkins.server))
plugins_list = [{'shortName': '',
'version': '',
'longName': ''}]
self.jenkins.server
)
)
plugins_list = [{"shortName": "", "version": "", "longName": ""}]
else:
raise
logger.debug("Jenkins Plugin Info {0}".format(pformat(plugins_list)))
@ -181,7 +185,7 @@ class JenkinsManager(object):
def is_managed(self, job_name):
xml = self.jenkins.get_job_config(job_name)
try:
out = XML.fromstring(xml.encode('utf-8'))
out = XML.fromstring(xml.encode("utf-8"))
description = out.find(".//description").text
return description.endswith(MAGIC_MANAGE_STRING)
except (TypeError, AttributeError):
@ -202,18 +206,21 @@ class JenkinsManager(object):
for job in jobs:
# python-jenkins stores the folder and name as 'fullname'
# Check if the job was deleted when his parent folder was deleted
if job['fullname'] not in keep and \
self.is_job(job['fullname'], use_cache=False):
if self.is_managed(job['fullname']):
logger.info("Removing obsolete jenkins job {0}"
.format(job['fullname']))
self.delete_job(job['fullname'])
if job["fullname"] not in keep and self.is_job(
job["fullname"], use_cache=False
):
if self.is_managed(job["fullname"]):
logger.info(
"Removing obsolete jenkins job {0}".format(job["fullname"])
)
self.delete_job(job["fullname"])
deleted_jobs += 1
else:
logger.info("Not deleting unmanaged jenkins job %s",
job['fullname'])
logger.info(
"Not deleting unmanaged jenkins job %s", job["fullname"]
)
else:
logger.debug("Keeping job %s", job['fullname'])
logger.debug("Keeping job %s", job["fullname"])
return deleted_jobs
def delete_jobs(self, jobs):
@ -221,15 +228,17 @@ class JenkinsManager(object):
logger.info("Removing jenkins job(s): %s" % ", ".join(jobs))
for job in jobs:
self.delete_job(job)
if(self.cache.is_cached(job)):
self.cache.set(job, '')
if self.cache.is_cached(job):
self.cache.set(job, "")
self.cache.save()
def delete_all_jobs(self):
jobs = self.get_jobs()
logger.info("Number of jobs to delete: %d", len(jobs))
script = ('for(job in jenkins.model.Jenkins.theInstance.getAllItems())'
' { job.delete(); }')
script = (
"for(job in jenkins.model.Jenkins.theInstance.getAllItems())"
" { job.delete(); }"
)
self.jenkins.run_script(script)
# Need to clear the JJB cache after deletion
self.cache.clear()
@ -237,8 +246,9 @@ class JenkinsManager(object):
def changed(self, job):
md5 = job.md5()
changed = (self._jjb_config.builder['ignore_cache'] or
self.cache.has_changed(job.name, md5))
changed = self._jjb_config.builder["ignore_cache"] or self.cache.has_changed(
job.name, md5
)
if not changed:
logger.debug("'{0}' has not changed".format(job.name))
return changed
@ -249,15 +259,20 @@ class JenkinsManager(object):
logger.debug("'{0}' does not currently exist".format(job.name))
return exists
def update_jobs(self, xml_jobs, output=None, n_workers=None,
existing_only=None, config_xml=False):
def update_jobs(
self,
xml_jobs,
output=None,
n_workers=None,
existing_only=None,
config_xml=False,
):
orig = time.time()
logger.info("Number of jobs generated: %d", len(xml_jobs))
xml_jobs.sort(key=AlphanumSort)
if (output and not hasattr(output, 'write') and
not os.path.isdir(output)):
if output and not hasattr(output, "write") and not os.path.isdir(output):
logger.debug("Creating directory %s" % output)
try:
os.makedirs(output)
@ -267,11 +282,11 @@ class JenkinsManager(object):
if output:
# ensure only wrapped once
if hasattr(output, 'write'):
if hasattr(output, "write"):
output = utils.wrap_stream(output)
for job in xml_jobs:
if hasattr(output, 'write'):
if hasattr(output, "write"):
# `output` is a file-like object
logger.info("Job name: %s", job.name)
logger.debug("Writing XML to '{0}'".format(output))
@ -289,39 +304,31 @@ class JenkinsManager(object):
output_fn = self._setup_output(output, job.name, config_xml)
logger.debug("Writing XML to '{0}'".format(output_fn))
with io.open(output_fn, 'w', encoding='utf-8') as f:
f.write(job.output().decode('utf-8'))
with io.open(output_fn, "w", encoding="utf-8") as f:
f.write(job.output().decode("utf-8"))
return xml_jobs, len(xml_jobs)
# Filter out the jobs that did not change
logging.debug('Filtering %d jobs for changed jobs',
len(xml_jobs))
logging.debug("Filtering %d jobs for changed jobs", len(xml_jobs))
step = time.time()
jobs = [job for job in xml_jobs
if self.changed(job)]
logging.debug("Filtered for changed jobs in %ss",
(time.time() - step))
jobs = [job for job in xml_jobs if self.changed(job)]
logging.debug("Filtered for changed jobs in %ss", (time.time() - step))
if existing_only:
# Filter out the jobs not already in the cache
logging.debug('Filtering %d jobs for existing jobs',
len(jobs))
logging.debug("Filtering %d jobs for existing jobs", len(jobs))
step = time.time()
jobs = [job for job in jobs
if self.exists(job)]
logging.debug("Filtered for existing jobs in %ss",
(time.time() - step))
jobs = [job for job in jobs if self.exists(job)]
logging.debug("Filtered for existing jobs in %ss", (time.time() - step))
if not jobs:
return [], 0
# Update the jobs
logging.debug('Updating jobs')
logging.debug("Updating jobs")
step = time.time()
p_params = [{'job': job} for job in jobs]
results = self.parallel_update_job(
n_workers=n_workers,
concurrent=p_params)
p_params = [{"job": job} for job in jobs]
results = self.parallel_update_job(n_workers=n_workers, concurrent=p_params)
logging.debug("Parsing results")
# generalize the result parsing, as a concurrent job always returns a
# list
@ -336,15 +343,13 @@ class JenkinsManager(object):
self.cache.set(j_name, j_md5)
# write cache to disk
self.cache.save()
logging.debug("Updated %d jobs in %ss",
len(jobs),
time.time() - step)
logging.debug("Updated %d jobs in %ss", len(jobs), time.time() - step)
logging.debug("Total run took %ss", (time.time() - orig))
return jobs, len(jobs)
@concurrent
def parallel_update_job(self, job):
self.update_job(job.name, job.output().decode('utf-8'))
self.update_job(job.name, job.output().decode("utf-8"))
return (job.name, job.md5())
################
@ -361,7 +366,7 @@ class JenkinsManager(object):
@property
def view_list(self):
if self._view_list is None:
self._view_list = set(view['name'] for view in self.views)
self._view_list = set(view["name"] for view in self.views)
return self._view_list
def get_views(self, cache=True):
@ -389,7 +394,7 @@ class JenkinsManager(object):
for view in views:
self.delete_view(view)
if self.cache.is_cached(view):
self.cache.set(view, '')
self.cache.set(view, "")
self.cache.save()
def delete_all_views(self):
@ -399,22 +404,30 @@ class JenkinsManager(object):
views.pop(0)
logger.info("Number of views to delete: %d", len(views))
for view in views:
self.delete_view(view['name'])
self.delete_view(view["name"])
# Need to clear the JJB cache after deletion
self.cache.clear()
def update_view(self, view_name, xml):
if self.is_view(view_name):
logger.info("Reconfiguring jenkins view {0}".format(
self._view_format(view_name)))
logger.info(
"Reconfiguring jenkins view {0}".format(self._view_format(view_name))
)
self.jenkins.reconfig_view(view_name, xml)
else:
logger.info("Creating jenkins view {0}".format(
self._view_format(view_name)))
logger.info(
"Creating jenkins view {0}".format(self._view_format(view_name))
)
self.jenkins.create_view(view_name, xml)
def update_views(self, xml_views, output=None, n_workers=None,
existing_only=None, config_xml=False):
def update_views(
self,
xml_views,
output=None,
n_workers=None,
existing_only=None,
config_xml=False,
):
orig = time.time()
logger.info("Number of views generated: %d", len(xml_views))
@ -422,11 +435,11 @@ class JenkinsManager(object):
if output:
# ensure only wrapped once
if hasattr(output, 'write'):
if hasattr(output, "write"):
output = utils.wrap_stream(output)
for view in xml_views:
if hasattr(output, 'write'):
if hasattr(output, "write"):
# `output` is a file-like object
logger.info("View name: %s", view.name)
logger.debug("Writing XML to '{0}'".format(output))
@ -444,39 +457,31 @@ class JenkinsManager(object):
output_fn = self._setup_output(output, view.name, config_xml)
logger.debug("Writing XML to '{0}'".format(output_fn))
with io.open(output_fn, 'w', encoding='utf-8') as f:
f.write(view.output().decode('utf-8'))
with io.open(output_fn, "w", encoding="utf-8") as f:
f.write(view.output().decode("utf-8"))
return xml_views, len(xml_views)
# Filter out the views that did not change
logging.debug('Filtering %d views for changed views',
len(xml_views))
logging.debug("Filtering %d views for changed views", len(xml_views))
step = time.time()
views = [view for view in xml_views
if self.changed(view)]
logging.debug("Filtered for changed views in %ss",
(time.time() - step))
views = [view for view in xml_views if self.changed(view)]
logging.debug("Filtered for changed views in %ss", (time.time() - step))
if existing_only:
# Filter out the jobs not already in the cache
logging.debug('Filtering %d views for existing jobs',
len(views))
logging.debug("Filtering %d views for existing jobs", len(views))
step = time.time()
views = [view for view in views
if self.exists(view)]
logging.debug("Filtered for existing views in %ss",
(time.time() - step))
views = [view for view in views if self.exists(view)]
logging.debug("Filtered for existing views in %ss", (time.time() - step))
if not views:
return [], 0
# Update the views
logging.debug('Updating views')
logging.debug("Updating views")
step = time.time()
p_params = [{'view': view} for view in views]
results = self.parallel_update_view(
n_workers=n_workers,
concurrent=p_params)
p_params = [{"view": view} for view in views]
results = self.parallel_update_view(n_workers=n_workers, concurrent=p_params)
logging.debug("Parsing results")
# generalize the result parsing, as a concurrent view always returns a
# list
@ -491,13 +496,11 @@ class JenkinsManager(object):
self.cache.set(v_name, v_md5)
# write cache to disk
self.cache.save()
logging.debug("Updated %d views in %ss",
len(views),
time.time() - step)
logging.debug("Updated %d views in %ss", len(views), time.time() - step)
logging.debug("Total run took %ss", (time.time() - orig))
return views, len(views)
@concurrent
def parallel_update_view(self, view):
self.update_view(view.name, view.output().decode('utf-8'))
self.update_view(view.name, view.output().decode("utf-8"))
return (view.name, view.md5())

View File

@ -43,43 +43,45 @@ class JobCache(object):
def __init__(self, jenkins_url, flush=False):
cache_dir = self.get_cache_dir()
# One cache per remote Jenkins URL:
host_vary = re.sub(r'[^A-Za-z0-9\-\~]', '_', jenkins_url)
host_vary = re.sub(r"[^A-Za-z0-9\-\~]", "_", jenkins_url)
self.cachefilename = os.path.join(
cache_dir, 'cache-host-jobs-' + host_vary + '.yml')
cache_dir, "cache-host-jobs-" + host_vary + ".yml"
)
# generate named lockfile if none exists, and lock it
self._locked = self._lock()
if not self._locked:
raise errors.JenkinsJobsException(
"Unable to lock cache for '%s'" % jenkins_url)
"Unable to lock cache for '%s'" % jenkins_url
)
if flush or not os.path.isfile(self.cachefilename):
self.data = {}
else:
with io.open(self.cachefilename, 'r', encoding='utf-8') as yfile:
with io.open(self.cachefilename, "r", encoding="utf-8") as yfile:
self.data = yaml.load(yfile)
logger.debug("Using cache: '{0}'".format(self.cachefilename))
def _lock(self):
self._fastener = fasteners.InterProcessLock("%s.lock" %
self.cachefilename)
self._fastener = fasteners.InterProcessLock("%s.lock" % self.cachefilename)
return self._fastener.acquire(delay=1, max_delay=2, timeout=60)
def _unlock(self):
if getattr(self, '_locked', False):
if getattr(self, '_fastener', None) is not None:
if getattr(self, "_locked", False):
if getattr(self, "_fastener", None) is not None:
self._fastener.release()
self._locked = None
@staticmethod
def get_cache_dir():
home = os.path.expanduser('~')
if home == '~':
raise OSError('Could not locate home folder')
xdg_cache_home = os.environ.get('XDG_CACHE_HOME') or \
os.path.join(home, '.cache')
path = os.path.join(xdg_cache_home, 'jenkins_jobs')
home = os.path.expanduser("~")
if home == "~":
raise OSError("Could not locate home folder")
xdg_cache_home = os.environ.get("XDG_CACHE_HOME") or os.path.join(
home, ".cache"
)
path = os.path.join(xdg_cache_home, "jenkins_jobs")
if not os.path.isdir(path):
try:
os.makedirs(path)
@ -111,9 +113,10 @@ class JobCache(object):
# use self references to required modules in case called via __del__
# write to tempfile under same directory and then replace to avoid
# issues around corruption such the process be killed
tfile = self._tempfile.NamedTemporaryFile(dir=self.get_cache_dir(),
delete=False)
tfile.write(self._yaml.dump(self.data).encode('utf-8'))
tfile = self._tempfile.NamedTemporaryFile(
dir=self.get_cache_dir(), delete=False
)
tfile.write(self._yaml.dump(self.data).encode("utf-8"))
# force contents to be synced on disk before overwriting cachefile
tfile.flush()
self._os.fsync(tfile.fileno())
@ -131,10 +134,12 @@ class JobCache(object):
def __del__(self):
# check we initialized sufficiently in case called
# due to an exception occurring in the __init__
if getattr(self, 'data', None) is not None:
if getattr(self, "data", None) is not None:
try:
self.save()
except Exception as e:
self._logger.error("Failed to write to cache file '%s' on "
"exit: %s" % (self.cachefilename, e))
self._logger.error(
"Failed to write to cache file '%s' on "
"exit: %s" % (self.cachefilename, e)
)
self._unlock()

View File

@ -31,8 +31,7 @@ logger = logging.getLogger()
def __version__():
return "Jenkins Job Builder version: %s" % \
version.version_info.version_string()
return "Jenkins Job Builder version: %s" % version.version_info.version_string()
class JenkinsJobs(object):
@ -58,17 +57,17 @@ class JenkinsJobs(object):
self.parser = create_parser()
self.options = self.parser.parse_args(args)
self.jjb_config = JJBConfig(self.options.conf,
config_section=self.options.section,
**kwargs)
self.jjb_config = JJBConfig(
self.options.conf, config_section=self.options.section, **kwargs
)
if not self.options.command:
self.parser.error("Must specify a 'command' to be performed")
if (self.options.log_level is not None):
self.options.log_level = getattr(logging,
self.options.log_level.upper(),
logger.getEffectiveLevel())
if self.options.log_level is not None:
self.options.log_level = getattr(
logging, self.options.log_level.upper(), logger.getEffectiveLevel()
)
logger.setLevel(self.options.log_level)
self._parse_additional()
@ -84,50 +83,58 @@ class JenkinsJobs(object):
def _parse_additional(self):
self._set_config(self.jjb_config.builder, 'ignore_cache')
self._set_config(self.jjb_config.builder, 'flush_cache')
self._set_config(self.jjb_config.builder, 'update')
self._set_config(self.jjb_config.yamlparser, 'allow_empty_variables')
self._set_config(self.jjb_config.jenkins, 'section')
self._set_config(self.jjb_config.jenkins, 'user')
self._set_config(self.jjb_config.jenkins, 'password')
self._set_config(self.jjb_config.builder, "ignore_cache")
self._set_config(self.jjb_config.builder, "flush_cache")
self._set_config(self.jjb_config.builder, "update")
self._set_config(self.jjb_config.yamlparser, "allow_empty_variables")
self._set_config(self.jjb_config.jenkins, "section")
self._set_config(self.jjb_config.jenkins, "user")
self._set_config(self.jjb_config.jenkins, "password")
# Note: CLI options override config file options.
if getattr(self.options, 'update', None) is None:
self.options.update = self.jjb_config.builder.get('update')
if getattr(self.options, "update", None) is None:
self.options.update = self.jjb_config.builder.get("update")
if self.options.update is None:
self.options.update = 'all'
self.options.update = "all"
if getattr(self.options, 'plugins_info_path', None) is not None:
with io.open(self.options.plugins_info_path, 'r',
encoding='utf-8') as yaml_file:
if getattr(self.options, "plugins_info_path", None) is not None:
with io.open(
self.options.plugins_info_path, "r", encoding="utf-8"
) as yaml_file:
plugins_info = yaml.load(yaml_file)
if not isinstance(plugins_info, list):
self.parser.error("{0} must contain a Yaml list!".format(
self.options.plugins_info_path))
self.jjb_config.builder['plugins_info'] = plugins_info
self.parser.error(
"{0} must contain a Yaml list!".format(
self.options.plugins_info_path
)
)
self.jjb_config.builder["plugins_info"] = plugins_info
if getattr(self.options, 'path', None):
if hasattr(self.options.path, 'read'):
if getattr(self.options, "path", None):
if hasattr(self.options.path, "read"):
logger.debug("Input file is stdin")
if self.options.path.isatty():
if platform.system() == 'Windows':
key = 'CTRL+Z'
if platform.system() == "Windows":
key = "CTRL+Z"
else:
key = 'CTRL+D'
logger.warning("Reading configuration from STDIN. "
"Press %s to end input.", key)
key = "CTRL+D"
logger.warning(
"Reading configuration from STDIN. " "Press %s to end input.",
key,
)
self.options.path = [self.options.path]
else:
# take list of paths
self.options.path = self.options.path.split(os.pathsep)
do_recurse = (getattr(self.options, 'recursive', False) or
self.jjb_config.recursive)
do_recurse = (
getattr(self.options, "recursive", False)
or self.jjb_config.recursive
)
excludes = ([e for elist in self.options.exclude
for e in elist.split(os.pathsep)] or
self.jjb_config.excludes)
excludes = [
e for elist in self.options.exclude for e in elist.split(os.pathsep)
] or self.jjb_config.excludes
paths = []
for path in self.options.path:
if do_recurse and os.path.isdir(path):
@ -139,8 +146,8 @@ class JenkinsJobs(object):
def execute(self):
extension_manager = extension.ExtensionManager(
namespace='jjb.cli.subcommands',
invoke_on_load=True,)
namespace="jjb.cli.subcommands", invoke_on_load=True
)
ext = extension_manager[self.options.command]
ext.obj.execute(self.options, self.jjb_config)
@ -154,10 +161,11 @@ def main():
if sys.version_info[0] == 2:
import codecs
reload(sys) # noqa
sys.setdefaultencoding('utf-8')
sys.stdout = codecs.getwriter('utf8')(sys.stdout)
sys.stderr = codecs.getwriter('utf8')(sys.stderr)
sys.setdefaultencoding("utf-8")
sys.stdout = codecs.getwriter("utf8")(sys.stdout)
sys.stderr = codecs.getwriter("utf8")(sys.stderr)
# end of workaround
argv = sys.argv[1:]

View File

@ -22,8 +22,10 @@ from stevedore import extension
def __version__():
return "Jenkins Job Builder version: %s" % \
jenkins_jobs.version.version_info.version_string()
return (
"Jenkins Job Builder version: %s"
% jenkins_jobs.version.version_info.version_string()
)
def create_parser():
@ -31,67 +33,78 @@ def create_parser():
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--conf',
dest='conf',
default=os.environ.get('JJB_CONF', None),
help="configuration file [JJB_CONF]")
"--conf",
dest="conf",
default=os.environ.get("JJB_CONF", None),
help="configuration file [JJB_CONF]",
)
parser.add_argument(
'-l',
'--log_level',
dest='log_level',
default=os.environ.get('JJB_LOG_LEVEL', 'info'),
help="log level (default: %(default)s) [JJB_LOG_LEVEL]")
"-l",
"--log_level",
dest="log_level",
default=os.environ.get("JJB_LOG_LEVEL", "info"),
help="log level (default: %(default)s) [JJB_LOG_LEVEL]",
)
parser.add_argument(
'--ignore-cache',
action='store_true',
dest='ignore_cache',
"--ignore-cache",
action="store_true",
dest="ignore_cache",
default=None,
help="ignore the cache and update the jobs anyhow (that will "
"only flush the specified jobs cache)")
"only flush the specified jobs cache)",
)
parser.add_argument(
'--flush-cache',
action='store_true',
dest='flush_cache',
"--flush-cache",
action="store_true",
dest="flush_cache",
default=None,
help="flush all the cache entries before updating")
help="flush all the cache entries before updating",
)
parser.add_argument(
'--version',
dest='version',
action='version',
"--version",
dest="version",
action="version",
version=__version__(),
help="show version")
help="show version",
)
parser.add_argument(
'--allow-empty-variables',
action='store_true',
dest='allow_empty_variables',
"--allow-empty-variables",
action="store_true",
dest="allow_empty_variables",
default=None,
help="Don\'t fail if any of the variables inside any string are "
"not defined, replace with empty string instead.")
help="Don't fail if any of the variables inside any string are "
"not defined, replace with empty string instead.",
)
parser.add_argument(
'--server', '-s',
dest='section',
default=os.environ.get('JJB_SECTION', 'jenkins'),
"--server",
"-s",
dest="section",
default=os.environ.get("JJB_SECTION", "jenkins"),
help="The Jenkins server ini section to use. Defaults to 'jenkins' "
"[JJB_SECTION]")
"[JJB_SECTION]",
)
parser.add_argument(
'--user', '-u',
default=os.environ.get('JJB_USER', None),
"--user",
"-u",
default=os.environ.get("JJB_USER", None),
help="The Jenkins user to use for authentication. This overrides "
"the user specified in the configuration file. [JJB_USER]")
"the user specified in the configuration file. [JJB_USER]",
)
parser.add_argument(
'--password', '-p',
default=os.environ.get('JJB_PASSWORD', None),
"--password",
"-p",
default=os.environ.get("JJB_PASSWORD", None),
help="Password or API token to use for authenticating towards Jenkins."
" This overrides the password specified in the configuration file."
" [JJB_PASSWORD]")
" [JJB_PASSWORD]",
)
subparser = parser.add_subparsers(
dest='command',
help="update, test, list or delete job")
dest="command", help="update, test, list or delete job"
)
extension_manager = extension.ExtensionManager(
namespace='jjb.cli.subcommands',
invoke_on_load=True,
namespace="jjb.cli.subcommands", invoke_on_load=True
)
def parse_subcommand_args(ext, subparser):

View File

@ -22,6 +22,7 @@ class BaseSubCommand(object):
"""Base class for Jenkins Job Builder subcommands, intended to allow
subcommands to be loaded as stevedore extensions by third party users.
"""
def __init__(self):
pass
@ -52,16 +53,20 @@ class BaseSubCommand(object):
"""Add '--recursive' and '--exclude' arguments to given parser.
"""
parser.add_argument(
'-r', '--recursive',
action='store_true',
dest='recursive',
"-r",
"--recursive",
action="store_true",
dest="recursive",
default=False,
help="look for yaml files recursively")
help="look for yaml files recursively",
)
parser.add_argument(
'-x', '--exclude',
dest='exclude',
action='append',
"-x",
"--exclude",
dest="exclude",
action="append",
default=[],
help="paths to exclude when using recursive search, "
"uses standard globbing.")
"uses standard globbing.",
)

View File

@ -22,32 +22,33 @@ import jenkins_jobs.cli.subcommand.base as base
class DeleteSubCommand(base.BaseSubCommand):
def parse_args(self, subparser):
delete = subparser.add_parser('delete')
delete = subparser.add_parser("delete")
self.parse_option_recursive_exclude(delete)
delete.add_argument("name", help="name of job", nargs="+")
delete.add_argument(
'name',
help='name of job',
nargs='+')
delete.add_argument(
'-p', '--path',
"-p",
"--path",
default=None,
help="colon-separated list of paths to YAML files "
"or directories")
delete.add_argument(
'-j', '--jobs-only',
action='store_true', dest='del_jobs',
default=False,
help='delete only jobs'
help="colon-separated list of paths to YAML files " "or directories",
)
delete.add_argument(
'-v', '--views-only',
action='store_true', dest='del_views',
"-j",
"--jobs-only",
action="store_true",
dest="del_jobs",
default=False,
help='delete only views'
help="delete only jobs",
)
delete.add_argument(
"-v",
"--views-only",
action="store_true",
dest="del_views",
default=False,
help="delete only views",
)
def execute(self, options, jjb_config):
@ -55,7 +56,8 @@ class DeleteSubCommand(base.BaseSubCommand):
if options.del_jobs and options.del_views:
raise JenkinsJobsException(
'"--views-only" and "--jobs-only" cannot be used together.')
'"--views-only" and "--jobs-only" cannot be used together.'
)
fn = options.path
registry = ModuleRegistry(jjb_config, builder.plugins_list)
@ -64,8 +66,8 @@ class DeleteSubCommand(base.BaseSubCommand):
if fn:
parser.load_files(fn)
parser.expandYaml(registry, options.name)
jobs = [j['name'] for j in parser.jobs]
views = [v['name'] for v in parser.views]
jobs = [j["name"] for j in parser.jobs]
views = [v["name"] for v in parser.views]
else:
jobs = options.name
views = options.name

View File

@ -27,26 +27,30 @@ logger = logging.getLogger(__name__)
class DeleteAllSubCommand(base.BaseSubCommand):
def parse_args(self, subparser):
delete_all = subparser.add_parser(
'delete-all',
"delete-all",
help="delete *ALL* jobs from Jenkins server, including "
"those not managed by Jenkins Job Builder.")
"those not managed by Jenkins Job Builder.",
)
self.parse_option_recursive_exclude(delete_all)
delete_all.add_argument(
'-j', '--jobs-only',
action='store_true', dest='del_jobs',
"-j",
"--jobs-only",
action="store_true",
dest="del_jobs",
default=False,
help='delete only jobs'
help="delete only jobs",
)
delete_all.add_argument(
'-v', '--views-only',
action='store_true', dest='del_views',
"-v",
"--views-only",
action="store_true",
dest="del_views",
default=False,
help='delete only views'
help="delete only views",
)
def execute(self, options, jjb_config):
@ -55,24 +59,26 @@ class DeleteAllSubCommand(base.BaseSubCommand):
reach = set()
if options.del_jobs and options.del_views:
raise JenkinsJobsException(
'"--views-only" and "--jobs-only" cannot be used together.')
'"--views-only" and "--jobs-only" cannot be used together.'
)
elif options.del_jobs and not options.del_views:
reach.add('jobs')
reach.add("jobs")
elif options.del_views and not options.del_jobs:
reach.add('views')
reach.add("views")
else:
reach.update(('jobs', 'views'))
reach.update(("jobs", "views"))
if not utils.confirm(
'Sure you want to delete *ALL* {} from Jenkins '
'server?\n(including those not managed by Jenkins '
'Job Builder)'.format(" AND ".join(reach))):
sys.exit('Aborted')
"Sure you want to delete *ALL* {} from Jenkins "
"server?\n(including those not managed by Jenkins "
"Job Builder)".format(" AND ".join(reach))
):
sys.exit("Aborted")
if 'jobs' in reach:
if "jobs" in reach:
logger.info("Deleting all jobs")
builder.delete_all_jobs()
if 'views' in reach:
if "views" in reach:
logger.info("Deleting all views")
builder.delete_all_views()

View File

@ -25,17 +25,18 @@ logger = logging.getLogger(__name__)
class GetPluginsInfoSubCommand(base.BaseSubCommand):
def parse_args(self, subparser):
plugins_info = subparser.add_parser(
'get-plugins-info',
help='get plugins info yaml by querying Jenkins server.')
"get-plugins-info", help="get plugins info yaml by querying Jenkins server."
)
plugins_info.add_argument(
'-o',