#!/usr/bin/python # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Copyright 2005 Duke University # Parts Copyright 2007 Red Hat, Inc """YYOOM: a package management utility Using Yum API instead of /usr/bin/yum provides several interesting capabilities, some of which we are desperate to use, including: - installing and removing packages in same transaction; - JSON output. """ import argparse import collections import functools import json import logging import os import pkg_resources import subprocess import sys import yum from contextlib import contextmanager LOG = logging.getLogger('yyoom') OUTPUT = None ACTION_TYPE_MAP = { yum.constants.TS_INSTALL: 'install', yum.constants.TS_TRUEINSTALL: 'install', yum.constants.TS_UPDATE: 'upgrade', yum.constants.TS_OBSOLETING: 'upgrade', yum.constants.TS_ERASE: 'erase', yum.constants.TS_OBSOLETED: 'erase', yum.constants.TS_UPDATED: 'erase', yum.constants.TS_FAILED: 'error' } def _extended_yum_raises(method): """Decorator to extend error messages when manipulating packages with yum. """ @functools.wraps(method) def wrapper(self, *args, **kwargs): try: return method(self, *args, **kwargs) except yum.Errors.YumBaseError as e: data = dict( method_name=method.__name__, args=[str(arg) for arg in args], kwargs='\n'.join(' %s: %s' % item for item in sorted(kwargs.items()))) details = yum.i18n._("\nDetails:\n" " method name: %(method_name)s\n" " arguments: %(args)s\n" " keyword arguments:\n%(kwargs)s") e.value += details % data raise return wrapper class _YyoomBase(yum.YumBase): def __init__(self, *args, **kwargs): """Reintroduced init to preset some settings. """ super(_YyoomBase, self).__init__(*args, **kwargs) self.setCacheDir(force=True) def _askForGPGKeyImport(self, po, userid, hexkeyid): """Tell yum to import GPG keys if needed. Fixes: https://bugs.launchpad.net/anvil/+bug/1210657 Fixes: https://bugs.launchpad.net/anvil/+bug/1218728 """ return True @_extended_yum_raises def install(self, po=None, **kwargs): return super(_YyoomBase, self).install(po, **kwargs) @_extended_yum_raises def remove(self, po=None, **kwargs): return super(_YyoomBase, self).remove(po, **kwargs) def _setup_output(): """Do some nasty manipulations with fds Yum internals may sometimes write to stdout, just out of a sudden. To prevent this output form interfering with our JSON, we save current stdout to other fd via os.dup, and replace fd 1 with /dev/null opened for writing. """ global OUTPUT # save current stdout for later use OUTPUT = os.fdopen(os.dup(sys.stdout.fileno()), 'wb') # close the stream sys.stdout.close() # open /dev/null -- all writes to stdout from now on will go there devnull_fd = os.open(os.devnull, os.O_WRONLY) if devnull_fd != 1: os.dup2(devnull_fd, 1) os.close(devnull_fd) sys.stdout = os.fdopen(1, 'w') def _write_output(data): """Dump given object as pretty json""" OUTPUT.write(json.dumps(data, indent=4, separators=(',', ': '), sort_keys=True) + '\n') def _action_type_from_code(action): """Return value according to action code in dictionary mapping Yum states. Yum has a mapping that sometimes really isn't that accurate enough for our needs, so make a mapping that will suit our needs instead. """ return ACTION_TYPE_MAP.get(action, 'other') def _package_info(pkg, **kwargs): if isinstance(pkg, basestring): result = dict(name=pkg, **kwargs) else: result = dict( name=pkg.name, epoch=pkg.epoch, version=pkg.version, release=pkg.release, provides=pkg.provides, repo=str(pkg.repo), arch=pkg.arch, **kwargs ) return result class _RPMCallback(yum.rpmtrans.RPMBaseCallback): """Listen to events from RPM transactions""" def event(self, package, action, te_current, te_total, ts_current, ts_total): pass def scriptout(self, package, msg): if not msg or not LOG.isEnabledFor(logging.INFO): return for line in msg.splitlines(): line = line.strip() if line: LOG.info("%s: %s", package, line) def errorlog(self, msg): LOG.error("%s", msg) def filelog(self, package, action): if not LOG.isEnabledFor(logging.INFO): return LOG.info("Performed %(action_type)s (code %(action)s) on %(package)s", dict(package=package, action=action, action_type=_action_type_from_code(action))) class _OutputtingRPMCallback(_RPMCallback): def __init__(self, skip_missing=False): _RPMCallback.__init__(self) self._skip_missing = skip_missing self._missing = [] def yyoom_post_transaction(self, base, _code): output = [] for txmbr in base.tsInfo: action_type = _action_type_from_code(txmbr.output_state) info = _package_info(txmbr.po, action_code=txmbr.output_state, action_type=action_type) output.append(info) _write_output(output + self._missing) def yyoom_on_missing_package(self, pkg_req): if not self._skip_missing: raise yum.Errors.InstallError("The '%s' package not found." % pkg_req) req = pkg_resources.Requirement.parse(pkg_req) self._missing.append(_package_info(req.unsafe_name, action_type="missing", requirement=pkg_req, action=None)) def log_list(items, title=''): if not items: return if title: if not title.endswith(':'): title = str(title) + ":" LOG.info(title) for i in items: LOG.info(" - %s" % (i)) def build_yum_map(base): rpms = base.doPackageLists(ignore_case=True, showdups=True) all_rpms = [] for name in ('available', 'installed', 'extras', 'reinstall_available'): all_rpms.extend(getattr(rpms, name, [])) yum_map = collections.defaultdict(list) for rpm in all_rpms: for provides in rpm.provides: yum_map[provides[0]].append((rpm.version, rpm)) return dict(yum_map) def _find_packages(yum_map, pkg_req): """Find suitable packages in YUM packages map""" req = pkg_resources.Requirement.parse(pkg_req) matches = [rpm for (version, rpm) in yum_map.get(req.unsafe_name, []) if version in req] if matches: return matches def _run(yum_base, options): """Handler of `transaction` command Installs and erases packages, prints what was done in JSON """ log_list(options.erase, title='Erasing packages:') log_list(options.install, title='Installing packages:') with _transaction(yum_base, _OutputtingRPMCallback(options.skip_missing)) as cb: yum_map = build_yum_map(yum_base) # erase packages for pkg_name in options.erase or (): matches = _find_packages(yum_map, pkg_name) if matches is None: cb.yyoom_on_missing_package(pkg_name) else: installed_packages = yum_base.rpmdb.returnPackages() for package in matches: if package in installed_packages: yum_base.remove(package) # install packages for pkg_name in options.install or (): matches = _find_packages(yum_map, pkg_name) if matches is None: cb.yyoom_on_missing_package(pkg_name) else: # try to install package from preferred repositories, # if not found - install from default ones repo_matches = [m for m in matches if m.repoid in options.prefer_repo] matches = repo_matches if repo_matches else matches yum_base.install(max(matches)) def _list(yum_base, options): """Handler of `list` command""" pkgnarrow = options.what[0] if len(options.what) == 1 else 'all' lists = yum_base.doPackageLists(pkgnarrow=pkgnarrow, showdups=True) LOG.debug("Got packages for '%s': %s installed, %s available," "%s available for reinstall, %s extras", pkgnarrow, len(lists.installed), len(lists.available), len(lists.reinstall_available), len(lists.extras)) result = [] if 'installed' in options.what: result.extend(_package_info(pkg, status='installed') for pkg in lists.installed) if 'available' in options.what: result.extend(_package_info(pkg, status='available') for pkg in lists.available) result.extend(_package_info(pkg, status='available') for pkg in lists.reinstall_available) if 'extras' in options.what: result.extend(_package_info(pkg, status='installed') for pkg in lists.extras) _write_output(result) def _cleanall(yum_base, options): """Handler of `cleanall` command""" LOG.info("Running yum cleanup") code = sum(( _run_yum_api('packages clean up', yum_base.cleanPackages), _run_yum_api('headers clean up', yum_base.cleanHeaders), _run_yum_api('metadata clean up', yum_base.cleanMetadata), _run_yum_api('sqlite clean up', yum_base.cleanSqlite), _run_yum_api('rpm db clean up', yum_base.cleanRpmDB), )) return code def _builddep(yum_base, options): """Handler of `builddep` command Installs build dependencies for given package, prints what was done in JSON. """ LOG.info("Installing build dependencies for package %s", options.srpm) srpm = yum.packages.YumLocalPackage(yum_base.ts, options.srpm) with _transaction(yum_base, _OutputtingRPMCallback()): for req in srpm.requiresList(): LOG.debug('Processing dependency: %s', req) if not ( req.startswith('rpmlib(') or yum_base.returnInstalledPackagesByDep(req) ): pkg = yum_base.returnPackageByDep(req) LOG.debug('Installing %s', pkg) yum_base.install(pkg) def _parse_arguments(args): parser = argparse.ArgumentParser(prog=args[0]) parser.add_argument('--verbose', '-v', action='store_true', help='verbose operation') parser.add_argument('--log-file', '-l') parser.add_argument('--quiet', '-q', action='store_true') # TODO(imelnikov): --format subparsers = parser.add_subparsers(title='subcommands') # Arg: list parser_list = subparsers.add_parser('list', help='list packages') parser_list.add_argument('what', nargs='+', choices=('installed', 'available', 'extras'), help='what packages to list') parser_list.set_defaults(func=_list, operation='List') # Arg: transaction parser_run = subparsers.add_parser('transaction', help='install or remove packages') parser_run.add_argument('--install', '-i', action='append', metavar='package', help='install package') parser_run.add_argument('--erase', '-e', action='append', metavar='package', help='erase package') parser_run.add_argument('--skip-missing', action='store_true', default=False, help='do not fail on missing packages') parser_run.add_argument('--prefer-repo', '-r', action='append', metavar='repository', default=[], help='preferred repository name') parser_run.set_defaults(func=_run, operation='Transaction') # Arg: srpm parser_builddep = subparsers.add_parser( 'builddep', help='install build dependencies of srpm') parser_builddep.add_argument('srpm', help='path to source RPM package') parser_builddep.set_defaults(func=_builddep, operation='Builddep') # Arg: cleanall parser_cleanall = subparsers.add_parser('cleanall', help='clean all') parser_cleanall.set_defaults(func=_cleanall, operation='Cleanall') return parser.parse_args(args[1:] or ['--help']) def _setup_logging(verbose=True, logfile=None, quiet=False): """Initialize logging""" # setup logging -- put messages to stderr handler = logging.StreamHandler(sys.stderr) handler.setFormatter(logging.Formatter('YYOOM %(levelname)s: %(message)s')) if quiet: loglevel = logging.ERROR elif verbose: loglevel = logging.DEBUG else: loglevel = logging.INFO handler.setLevel(loglevel) root_logger = logging.getLogger() root_logger.addHandler(handler) root_logger.setLevel(logging.DEBUG) # Setup logfile if we have one if logfile: file_handler = logging.FileHandler(logfile) file_handler.setFormatter( logging.Formatter('%(asctime)s %(levelname)s: %(message)s')) file_handler.setLevel(logging.DEBUG) root_logger.addHandler(file_handler) def _run_yum_api(name, func, ok_codes=(0,), *args, **kwargs): code, results = func(*args, **kwargs) for msg in results: LOG.debug(msg) if code not in ok_codes: LOG.error('%s failed', name.title()) return code @contextmanager def _transaction(base, callback): """Manage Yum transactions Locks and unlocks Yum database, builds and processes transaction on __exit__. """ try: base.doLock() yield callback code = _run_yum_api('building transaction', base.buildTransaction, ok_codes=(0, 2)) failed = [] if code == 0: LOG.debug('Nothing to do') elif code == 2: base.processTransaction(rpmTestDisplay=callback, rpmDisplay=callback) failed = [txmbr for txmbr in base.tsInfo if txmbr.output_state == yum.constants.TS_FAILED] else: raise RuntimeError("Transaction failed: %s" % code) post_cb = getattr(callback, 'yyoom_post_transaction', None) if post_cb: post_cb(base, code) if failed: raise RuntimeError("Operation failed for %s" % ', '.join(txmbr.name for txmbr in failed)) finally: del base.tsInfo del base.ts base.doUnlock() def main(args): options = _parse_arguments(args) _setup_output() _setup_logging(options.verbose, options.log_file, options.quiet) LOG.debug('Running YOOM %s', options.operation) LOG.debug('Command line: %s', subprocess.list2cmdline(args)) try: yum_base = _YyoomBase() code = options.func(yum_base, options) or 0 except Exception as e: if options.verbose: LOG.exception("%s failed", options.operation) else: LOG.error("%s failed: %s", options.operation, e) code = 1 LOG.debug('Exiting, code=%s', code) return code if __name__ == '__main__': sys.exit(main(sys.argv))