# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import io import logging from functools import partial import yaml from .errors import JenkinsJobsException from .yaml_objects import BaseYamlObject from .expander import Expander, ParamsExpander, deprecated_yaml_tags, yaml_classes_list from .roots import root_adders logger = logging.getLogger(__name__) class Loader(yaml.Loader): @classmethod def empty(cls, jjb_config): return cls(io.StringIO(), jjb_config) def __init__(self, stream, jjb_config, source_path=None, anchors=None): super().__init__(stream) self.jjb_config = jjb_config self.source_path = source_path self._retain_anchors = jjb_config.yamlparser["retain_anchors"] if anchors: # Override default set by super class. self.anchors = anchors # Override the default composer to skip resetting the anchors at the # end of the current document. def compose_document(self): # Drop the DOCUMENT-START event. self.get_event() # Compose the root node. node = self.compose_node(None, None) # Drop the DOCUMENT-END event. self.get_event() return node def _with_stream(self, stream, source_path): return Loader(stream, self.jjb_config, source_path, self.anchors) def load_fp(self, fp): return self.load(fp) def load_path(self, path): return self.load(path.read_text(), source_path=path) def load(self, stream, source_path=None): loader = self._with_stream(stream, source_path) try: return loader.get_single_data() finally: loader.dispose() if self._retain_anchors: self.anchors.update(loader.anchors) def load_deprecated_yaml(tag, cls, loader, node): logger.warning("Tag %r is deprecated, switch to using %r", tag, cls.yaml_tag) return cls.from_yaml(loader, node) for cls in yaml_classes_list: yaml.add_constructor(cls.yaml_tag, cls.from_yaml, Loader) for tag, cls in deprecated_yaml_tags: yaml.add_constructor(tag, partial(load_deprecated_yaml, tag, cls), Loader) def is_stdin(path): return hasattr(path, "read") def enum_expanded_paths(path_list): visited_set = set() def real(path): real_path = path.resolve() if real_path in visited_set: logger.warning( "File '%s' is already added as '%s'; ignoring reference to avoid" " duplicating YAML definitions.", path, real_path, ) else: yield real_path visited_set.add(real_path) for path in path_list: if is_stdin(path): yield path elif path.is_dir(): for p in path.iterdir(): if p.suffix in {".yml", ".yaml"}: yield from real(p) else: yield from real(path) def load_files(config, roots, path_list): expander = Expander(config) params_expander = ParamsExpander(config) loader = Loader.empty(config) for path in enum_expanded_paths(path_list): if is_stdin(path): data = loader.load_fp(path) else: data = loader.load_path(path) if not isinstance(data, list): raise JenkinsJobsException( f"The topmost collection in file '{path}' must be a list," f" not a {type(data)}" ) for item in data: if not isinstance(item, dict): raise JenkinsJobsException( f"{path}: Topmost list should contain single-item dict," f" not a {type(item)}. Missing indent?" ) if len(item) != 1: raise JenkinsJobsException( f"{path}: Topmost dict should be single-item," f" but have keys {item.keys()}. Missing indent?" ) kind, contents = next(iter(item.items())) if kind.startswith("_"): continue if isinstance(contents, BaseYamlObject): contents = contents.expand(expander, params={}) try: adder = root_adders[kind] except KeyError: raise JenkinsJobsException( f"{path}: Unknown topmost element type : {kind!r}," f" Known are: {','.join(root_adders)}." ) adder(config, roots, expander, params_expander, contents)