Image building tools for OpenStack
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

355 lines
12 KiB

# Copyright 2013 Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import argparse
import collections
import errno
import logging
import os
import sys
import yaml
import diskimage_builder.logging_config
logger = logging.getLogger(__name__)
class MissingElementException(Exception):
class AlreadyProvidedException(Exception):
class MissingOSException(Exception):
class InvalidElementDir(Exception):
class Element(object):
"""An element"""
def _get_element_set(self, path):
"""Get element set from element-[deps|provides] file
:param path: path to element description
:return: the set of elements in the file, or a blank set if
the file is not found.
with open(path) as f:
lines = (line.strip() for line in f)
# Strip blanks, but do we want to strip comment lines
# too? No use case at the moment, and comments might
# break other things that poke at the element-* files.
lines = (line for line in lines if line)
return set(lines)
except IOError as e:
if e.errno == errno.ENOENT:
return set([])
def _make_rdeps(self, all_elements):
"""Make a list of reverse dependencies (who depends on us).
Only valid after _find_all_elements()
:param all_elements: dict as returned by _find_all_elements()
:return: nothing, but elements will have r_depends var
# note; deliberatly left out of __init__ so that accidental
# access without init raises error
self.r_depends = []
for name, element in all_elements.items():
if in element.depends:
def __init__(self, name, path):
"""A new element
:param name: The element name
:param path: Full path to element. element-deps and
element-provides files will be parsed
""" = name
self.path = path
# read the provides & depends files for this element into a
# set; if the element has them.
self.provides = self._get_element_set(
os.path.join(path, 'element-provides'))
self.depends = self._get_element_set(
os.path.join(path, 'element-deps'))
# Uncomment to see all elements and deps listed as they're found
# logger.debug("New element : %s", str(self))
def __eq__(self, other):
return ==
def __repr__(self):
def __str__(self):
return '%s p:<%s> d:<%s>' % (,
def _get_elements_dir():
if not os.environ.get('ELEMENTS_PATH'):
raise Exception("$ELEMENTS_PATH must be set.")
return os.environ['ELEMENTS_PATH']
def _expand_element_dependencies(user_elements, all_elements):
"""Expand user requested elements using element-deps files.
:param user_elements: iterable enumerating the elements a user requested
:param all_elements: Element object dictionary from find_all_elements
:return: a set containing the names of user_elements and all
dependent elements including any transitive dependencies.
final_elements = set(user_elements)
check_queue = collections.deque(user_elements)
provided = set()
provided_by = collections.defaultdict(list)
while check_queue:
# bug #1303911 - run through the provided elements first to avoid
# adding unwanted dependencies and looking for virtual elements
element = check_queue.popleft()
if element in provided:
elif element not in all_elements:
raise MissingElementException("Element '%s' not found" % element)
element_obj = all_elements[element]
element_deps = element_obj.depends
element_provides = element_obj.provides
# Check that we are not providing an element which has already
# been provided by someone else, and additionally save which
# elements provide another element
for provide in element_provides:
if provide in provided:
raise AlreadyProvidedException(
"%s: already provided by %s" %
(provide, provided_by[provide]))
check_queue.extend(element_deps - (final_elements | provided))
conflicts = set(user_elements) & provided
if conflicts:
"The following elements are already provided by another element")
for element in conflicts:
logger.error("%s : already provided by %s",
element, provided_by[element])
raise AlreadyProvidedException()
if "operating-system" not in provided:
raise MissingOSException("Please include an operating system element")
out = final_elements - provided
return([all_elements[element] for element in out])
def _find_all_elements(paths=None):
"""Build a dictionary Element() objects
Walk ELEMENTS_PATH and find all elements. Make an Element object
for each element we wish to consider. Note we process overrides
such that elements specified earlier in the ELEMENTS_PATH override
those seen later.
:param paths: A list of paths to find elements in. If None will
use ELEMENTS_PATH from environment
:return: a dictionary of all elements
all_elements = {}
# note we process the later entries *first*, so that earlier
# entries will override later ones. i.e. with
# ELEMENTS_PATH=path1:path2:path3
# we want the elements in "path1" to override "path3"
if not paths:
paths = list(reversed(_get_elements_dir().split(':')))
paths = list(reversed(paths.split(':')))
logger.debug("ELEMENTS_PATH is: %s", ":".join(paths))
for path in paths:
if not os.path.isdir(path):
raise InvalidElementDir("ELEMENTS_PATH entry '%s' "
"is not a directory " % path)
# In words : make a list of directories in "path". Since an
# element is a directory, this is our list of elements.
elements = [os.path.realpath(os.path.join(path, f))
for f in os.listdir(path)
if os.path.isdir(os.path.join(path, f))]
for element in elements:
# the element name is the last part of the full path in
# element (these are all directories, we know that from
# above)
name = os.path.basename(element)
new_element = Element(name, element)
if name in all_elements:
logger.warning("Element <%s> overrides <%s>",
new_element.path, all_elements[name].path)
all_elements[name] = new_element
# Now we have all the elements, make a call on each element to
# store it's reverse dependencies
for name, element in all_elements.items():
return all_elements
def _get_elements(elements, paths=None):
"""Return the canonical list of Element objects
This function returns Element objects. For exernal calls, use
get_elements which returns a simple tuple & list.
:param elements: user specified list of elements
:param paths: element paths, default to environment
all_elements = _find_all_elements(paths)
return _expand_element_dependencies(elements, all_elements)
def get_elements(elements, paths=None):
"""Return the canonical list of elements with their dependencies
.. note::
You probably do not want to use this! Elements that require
access to the list of all other elements should generally use
the environment variables exported by disk-image-create below.
:param elements: user specified elements
:param paths: Alternative ELEMENTS_PATH; default is to use from env
:return: A de-duplicated list of tuples [(element, path),
(element, path) ...] with all elements and their
dependents, including any transitive dependencies.
elements = _get_elements(elements, paths)
return [(, element.path) for element in elements]
def expand_dependencies(user_elements, element_dirs):
"""Deprecated method for expanding element dependencies.
.. warning::
DO NOT USE THIS FUNCTION. For compatibility reasons, this
function does not provide paths to the returned elements. This
means the caller must process override rules if two elements
with the same name appear in element_dirs
:param user_elements: iterable enumerating the elements a user requested
:param element_dirs: The ELEMENTS_PATH to process
:return: a set containing user_elements and all dependent
elements including any transitive dependencies.
logger.warning("expand_dependencies() deprecated, use get_elements")
elements = _get_elements(user_elements, element_dirs)
return set([ for element in elements])
def _output_env_vars(elements):
"""Output eval-able bash strings for IMAGE_ELEMENT vars
:param elements: list of Element objects to represent
# first the "legacy" environment variable that just lists the
# elements
print("export IMAGE_ELEMENT='%s'" %
' '.join([ for element in elements]))
# Then YAML
output = {}
for element in elements:
output[] = element.path
print("export IMAGE_ELEMENT_YAML='%s'" % yaml.safe_dump(output))
# Then bash array. Unfortunately, bash can't export array
# variables. So we take a compromise and produce an exported
# function that outputs the string to re-create the array.
# You can then simply do
# eval declare -A element_array=$(get_image_element_array)
# and you have it.
output = ""
for element in elements:
output += '[%s]=%s ' % (, element.path)
print("function get_image_element_array {\n"
" echo \"%s\"\n"
"export -f get_image_element_array;" % output)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('elements', nargs='+',
help='display dependencies of the given elements')
parser.add_argument('--env', '-e', action='store_true',
help=('Output eval-able bash strings for '
'IMAGE_ELEMENT variables'))
args = parser.parse_args(sys.argv[1:])
elements = _get_elements(args.elements)
if args.env:
# deprecated compatibility output; doesn't include paths.
print(' '.join([ for element in elements]))
return 0
if __name__ == "__main__":