# -*- coding: utf-8 -*- # Copyright 2014 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime import hashlib import io import logging import os import shutil import subprocess import tarfile import yaml from distutils import dir_util from distutils.version import StrictVersion from glob import glob from mako.template import Template from fuel_plugin_builder import errors logger = logging.getLogger(__name__) def is_executable(file_path): """Checks if file executable :param str file_path: path to the file :returns: True if file is executable, False if is not """ return os.path.isfile(file_path) and os.access(file_path, os.X_OK) def which(cmd): """Checks if file executable :param str cmd: the name of the command or path :returns: None if there is no such command, if there is such command returns the path to the command """ fpath, fname = os.path.split(cmd) if fpath: if is_executable(cmd): return cmd for path in os.environ['PATH'].split(os.pathsep): exe_file = os.path.join(path, cmd) if is_executable(exe_file): return exe_file return None def exec_cmd(cmd, cwd=None): """Execute command with logging. Ouput of stdout and stderr will be written in log. :param cmd: shell command :param cwd: string or None """ logger.debug(u'Execute command "{0}"'.format(cmd)) child = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, cwd=cwd) logger.debug(u'Stdout and stderr of command "{0}":'.format(cmd)) for line in child.stdout: logger.debug(line.rstrip()) child.wait() exit_code = child.returncode if exit_code != 0: raise errors.ExecutedErrorNonZeroExitCode( u'Shell command executed with "{0}" ' 'exit code: {1} '.format(exit_code, cmd)) logger.debug(u'Command "{0}" successfully executed'.format(cmd)) def exec_piped_cmds(cmds, cwd=None): """Execute pipe of commands with logging. :param cmds: list of shell commands :type cmds: list :param cwd: current working directory :type cwd: string or None """ logger.debug(u'Executing commands "{0}"'.format(" | ".join(cmds))) std_out = None for cmd in cmds: child = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, shell=True, cwd=cwd) std_out, std_err = child.communicate(input=std_out) exit_code = child.returncode if exit_code != 0: logger.debug(u'Stderr of command "{0}":'.format(cmd)) logger.debug(std_err) raise errors.ExecutedErrorNonZeroExitCode( u'Shell command executed with "{0}" ' 'exit code: {1} '.format(exit_code, cmd)) logger.debug(u'Stdout of command "{0}":'.format(" | ".join(cmds))) logger.debug(std_out) logger.debug( u'Command "{0}" successfully executed'.format(" | ".join(cmds)) ) def create_dir(dir_path): """Creates directory :param dir_path: directory path :raises: errors.DirectoryExistsError """ logger.debug(u'Creating directory %s', dir_path) if not os.path.isdir(dir_path): os.makedirs(dir_path) def exists(path): """Checks if filel is exist :param str path: path to the file :returns: True if file is exist, Flase if is not """ return os.path.lexists(path) def basename(path): """Basename for path :param str path: path to the file :returns: str with filename """ return os.path.basename(path) def render_to_file(src, dst, params): """Render mako template and write it to specified file :param src: path to template :param dst: path where rendered template will be saved """ logger.debug(u'Render template from {0} to {1} with params: {2}'.format( src, dst, params)) # NOTE(aroma): we use io.open because sometimes we ended up with # non-ascii chars in rendered template so must explicitly # converse content to 'utf-8' encoding before writing with io.open(src, 'r', encoding='utf-8') as f: template_file = f.read() with io.open(dst, 'w', encoding='utf-8') as f: # NOTE(aroma): 'render' in such configuration always # return unicode object as the result rendered_file = Template(template_file).render(**params) f.write(rendered_file) def render_files_in_dir(dir_path, params): """Renders all *.mako files and removes templates :param str dir_path: path to the directory :param dict params: parameters for rendering """ for root, _, files in os.walk(dir_path): for file_path in files: name, extension = os.path.splitext(file_path) if not extension == '.mako': continue src_path = os.path.join(root, file_path) dst_path = os.path.join(root, name) render_to_file(src_path, dst_path, params) copy_file_permissions(src_path, dst_path) remove(src_path) def copy_file_permissions(src, dst): """Copies file permissions :param str src: source file :param str dst: destination """ shutil.copymode(src, dst) def remove(path): """Remove file or directory :param path: a file or directory to remove """ logger.debug(u'Removing "%s"', path) if not os.path.lexists(path): return if os.path.isdir(path) and not os.path.islink(path): shutil.rmtree(path) else: os.remove(path) def copy(src, dst): """Copy a given file or directory from one place to another. Rewrite already exists files. :param src: copy from :param dst: copy to """ logger.debug(u'Copy from %s to %s', src, dst) if os.path.isdir(src): # dir_util.copy_tree use here instead of shutil.copytree because # it can overwrite existing folder and files. This is necessary # for our template combinations, e.g.: base and v1 dir_util.copy_tree(src, dst, preserve_symlinks=True) else: shutil.copy(src, dst) def copy_files_in_dir(src, dst): """Copies file in directory :param str src: source files :param str dst: destination directory """ logger.debug(u'Copy files in directory %s %s', src, dst) for f in glob(src): dst_path = os.path.join(dst, os.path.basename(f)) copy(f, dst_path) def move_files_in_dir(src, dst): """Move files or directories :param str src: source files or directories :param str dst: destination directory """ logger.debug(u'Move files to directory %s %s', src, dst) for f in glob(src): dst_path = os.path.join(dst, os.path.basename(f)) shutil.move(f, dst_path) def make_tar_gz(dir_path, tar_path, files_prefix): """Compress the file in tar.gz archive :param str dir_path: directory for archiving :param str tar_path: the name and path to the file :param str files_prefix: the directory in the tar files where all of the files are allocated """ logger.debug(u'Archive directory %s to file %s', dir_path, tar_path) tar = tarfile.open(tar_path, 'w:gz') tar.add(dir_path, arcname=files_prefix) tar.close() def parse_yaml(path): """Parses yaml file :param str path: path to the file :returns: dict or list """ return yaml.load(open(path)) def calculate_sha(file_path, chunk_size=2 ** 20): """Calculate file's checksum :param str file_path: file path :param int chunk_size: optional parameter, size of chunk :returns: SHA1 string """ sha = hashlib.sha1() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(chunk_size), b''): sha.update(chunk) return sha.hexdigest() def calculate_checksums(dir_path): """Calculates checksums of files in the directory :param str dir_path: path to the directory :returns: list of dicts, where 'checksum' is SHA1, 'file_path' is a relative path to the file """ checksums = [] for root, _, files in os.walk(dir_path): for file_path in files: full_path = os.path.join(root, file_path) rel_path = os.path.relpath(full_path, dir_path) checksums.append({ 'checksum': calculate_sha(full_path), 'file_path': rel_path}) return checksums def create_checksums_file(dir_path, checksums_file): """Creates file with checksums :param str dir_path: path to the directory for checksums calculation :param str checksums_file: path to the file where checksums are saved """ checksums = calculate_checksums(dir_path) checksums_sorted = sorted(checksums, key=lambda c: c['file_path']) checksum_lines = [ '{checksum} {file_path}\n'.format(**checksum) for checksum in checksums_sorted] with open(checksums_file, 'w') as f: f.writelines(checksum_lines) def version_split_name_rpm(version): version_tuple = StrictVersion(version).version major = '.'.join(map(str, version_tuple[0:2])) minor = version return (major, minor) def get_current_year(): """Returns current year """ return str(datetime.date.today().year) def remove_by_mask(mask): """Deletes files by mask :param str mask: files mask """ logger.debug(u'Remove files by mask %s', mask) for f in glob(mask): remove(f) def read_if_exist(filename): """Read contents from filename :param str filename: path to the file :retruns: str with contents of filename or empty string """ if not exists(filename): logger.debug('File not found. Skipping {0}'.format(filename)) return "" with open(filename) as f: logger.debug('Reading file {0}'.format(filename)) return f.read()