1e4112d1d6
Sync from charm-helpers to update [service_user] config to use the service domain. The keystone charm currently creates two service users, one for the service domain (for v3 authentication), and the other for the default domain (for v2 authentication). The [service_user] config needs to use the service domain. Closes-Bug: #2026202 Change-Id: Ia1329a6c53cc4b532436751f0396149139a88172
328 lines
11 KiB
Python
328 lines
11 KiB
Python
# Copyright 2019-2021 Canonical Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Provide a subset of the ``python-apt`` module API.
|
|
|
|
Data collection is done through subprocess calls to ``apt-cache`` and
|
|
``dpkg-query`` commands.
|
|
|
|
The main purpose for this module is to avoid dependency on the
|
|
``python-apt`` python module.
|
|
|
|
The indicated python module is a wrapper around the ``apt`` C++ library
|
|
which is tightly connected to the version of the distribution it was
|
|
shipped on. It is not developed in a backward/forward compatible manner.
|
|
|
|
This in turn makes it incredibly hard to distribute as a wheel for a piece
|
|
of python software that supports a span of distro releases [0][1].
|
|
|
|
Upstream feedback like [2] does not give confidence in this ever changing,
|
|
so with this we get rid of the dependency.
|
|
|
|
0: https://github.com/juju-solutions/layer-basic/pull/135
|
|
1: https://bugs.launchpad.net/charm-octavia/+bug/1824112
|
|
2: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=845330#10
|
|
"""
|
|
|
|
import locale
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
from charmhelpers import deprecate
|
|
from charmhelpers.core.hookenv import log
|
|
|
|
|
|
class _container(dict):
|
|
"""Simple container for attributes."""
|
|
__getattr__ = dict.__getitem__
|
|
__setattr__ = dict.__setitem__
|
|
|
|
|
|
class Package(_container):
|
|
"""Simple container for package attributes."""
|
|
|
|
|
|
class Version(_container):
|
|
"""Simple container for version attributes."""
|
|
|
|
|
|
class Cache(object):
|
|
"""Simulation of ``apt_pkg`` Cache object."""
|
|
def __init__(self, progress=None):
|
|
pass
|
|
|
|
def __contains__(self, package):
|
|
try:
|
|
pkg = self.__getitem__(package)
|
|
return pkg is not None
|
|
except KeyError:
|
|
return False
|
|
|
|
def __getitem__(self, package):
|
|
"""Get information about a package from apt and dpkg databases.
|
|
|
|
:param package: Name of package
|
|
:type package: str
|
|
:returns: Package object
|
|
:rtype: object
|
|
:raises: KeyError, subprocess.CalledProcessError
|
|
"""
|
|
apt_result = self._apt_cache_show([package])[package]
|
|
apt_result['name'] = apt_result.pop('package')
|
|
pkg = Package(apt_result)
|
|
dpkg_result = self.dpkg_list([package]).get(package, {})
|
|
current_ver = None
|
|
installed_version = dpkg_result.get('version')
|
|
if installed_version:
|
|
current_ver = Version({'ver_str': installed_version})
|
|
pkg.current_ver = current_ver
|
|
pkg.architecture = dpkg_result.get('architecture')
|
|
return pkg
|
|
|
|
@deprecate("use dpkg_list() instead.", "2022-05", log=log)
|
|
def _dpkg_list(self, packages):
|
|
return self.dpkg_list(packages)
|
|
|
|
def dpkg_list(self, packages):
|
|
"""Get data from system dpkg database for package.
|
|
|
|
Note that this method is also useful for querying package names
|
|
containing wildcards, for example
|
|
|
|
apt_cache().dpkg_list(['nvidia-vgpu-ubuntu-*'])
|
|
|
|
may return
|
|
|
|
{
|
|
'nvidia-vgpu-ubuntu-470': {
|
|
'name': 'nvidia-vgpu-ubuntu-470',
|
|
'version': '470.68',
|
|
'architecture': 'amd64',
|
|
'description': 'NVIDIA vGPU driver - version 470.68'
|
|
}
|
|
}
|
|
|
|
:param packages: Packages to get data from
|
|
:type packages: List[str]
|
|
:returns: Structured data about installed packages, keys like
|
|
``dpkg-query --list``
|
|
:rtype: dict
|
|
:raises: subprocess.CalledProcessError
|
|
"""
|
|
pkgs = {}
|
|
cmd = [
|
|
'dpkg-query', '--show',
|
|
'--showformat',
|
|
r'${db:Status-Abbrev}\t${Package}\t${Version}\t${Architecture}\t${binary:Summary}\n'
|
|
]
|
|
cmd.extend(packages)
|
|
try:
|
|
output = subprocess.check_output(cmd,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
except subprocess.CalledProcessError as cp:
|
|
# ``dpkg-query`` may return error and at the same time have
|
|
# produced useful output, for example when asked for multiple
|
|
# packages where some are not installed
|
|
if cp.returncode != 1:
|
|
raise
|
|
output = cp.output
|
|
for line in output.splitlines():
|
|
# only process lines for successfully installed packages
|
|
if not (line.startswith('ii ') or line.startswith('hi ')):
|
|
continue
|
|
status, name, version, arch, desc = line.split('\t', 4)
|
|
pkgs[name] = {
|
|
'name': name,
|
|
'version': version,
|
|
'architecture': arch,
|
|
'description': desc,
|
|
}
|
|
return pkgs
|
|
|
|
def _apt_cache_show(self, packages):
|
|
"""Get data from system apt cache for package.
|
|
|
|
:param packages: Packages to get data from
|
|
:type packages: List[str]
|
|
:returns: Structured data about package, keys like
|
|
``apt-cache show``
|
|
:rtype: dict
|
|
:raises: subprocess.CalledProcessError
|
|
"""
|
|
pkgs = {}
|
|
cmd = ['apt-cache', 'show', '--no-all-versions']
|
|
cmd.extend(packages)
|
|
if locale.getlocale() == (None, None):
|
|
# subprocess calls out to locale.getpreferredencoding(False) to
|
|
# determine encoding. Workaround for Trusty where the
|
|
# environment appears to not be set up correctly.
|
|
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
|
|
try:
|
|
output = subprocess.check_output(cmd,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
previous = None
|
|
pkg = {}
|
|
for line in output.splitlines():
|
|
if not line:
|
|
if 'package' in pkg:
|
|
pkgs.update({pkg['package']: pkg})
|
|
pkg = {}
|
|
continue
|
|
if line.startswith(' '):
|
|
if previous and previous in pkg:
|
|
pkg[previous] += os.linesep + line.lstrip()
|
|
continue
|
|
if ':' in line:
|
|
kv = line.split(':', 1)
|
|
key = kv[0].lower()
|
|
if key == 'n':
|
|
continue
|
|
previous = key
|
|
pkg.update({key: kv[1].lstrip()})
|
|
except subprocess.CalledProcessError as cp:
|
|
# ``apt-cache`` returns 100 if none of the packages asked for
|
|
# exist in the apt cache.
|
|
if cp.returncode != 100:
|
|
raise
|
|
return pkgs
|
|
|
|
|
|
class Config(_container):
|
|
def __init__(self):
|
|
super(Config, self).__init__(self._populate())
|
|
|
|
def _populate(self):
|
|
cfgs = {}
|
|
cmd = ['apt-config', 'dump']
|
|
output = subprocess.check_output(cmd,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
for line in output.splitlines():
|
|
if not line.startswith("CommandLine"):
|
|
k, v = line.split(" ", 1)
|
|
cfgs[k] = v.strip(";").strip("\"")
|
|
|
|
return cfgs
|
|
|
|
|
|
# Backwards compatibility with old apt_pkg module
|
|
sys.modules[__name__].config = Config()
|
|
|
|
|
|
def init():
|
|
"""Compatibility shim that does nothing."""
|
|
pass
|
|
|
|
|
|
def upstream_version(version):
|
|
"""Extracts upstream version from a version string.
|
|
|
|
Upstream reference: https://salsa.debian.org/apt-team/apt/blob/master/
|
|
apt-pkg/deb/debversion.cc#L259
|
|
|
|
:param version: Version string
|
|
:type version: str
|
|
:returns: Upstream version
|
|
:rtype: str
|
|
"""
|
|
if version:
|
|
version = version.split(':')[-1]
|
|
version = version.split('-')[0]
|
|
return version
|
|
|
|
|
|
def version_compare(a, b):
|
|
"""Compare the given versions.
|
|
|
|
Call out to ``dpkg`` to make sure the code doing the comparison is
|
|
compatible with what the ``apt`` library would do. Mimic the return
|
|
values.
|
|
|
|
Upstream reference:
|
|
https://apt-team.pages.debian.net/python-apt/library/apt_pkg.html
|
|
?highlight=version_compare#apt_pkg.version_compare
|
|
|
|
:param a: version string
|
|
:type a: str
|
|
:param b: version string
|
|
:type b: str
|
|
:returns: >0 if ``a`` is greater than ``b``, 0 if a equals b,
|
|
<0 if ``a`` is smaller than ``b``
|
|
:rtype: int
|
|
:raises: subprocess.CalledProcessError, RuntimeError
|
|
"""
|
|
for op in ('gt', 1), ('eq', 0), ('lt', -1):
|
|
try:
|
|
subprocess.check_call(['dpkg', '--compare-versions',
|
|
a, op[0], b],
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
return op[1]
|
|
except subprocess.CalledProcessError as cp:
|
|
if cp.returncode == 1:
|
|
continue
|
|
raise
|
|
else:
|
|
raise RuntimeError('Unable to compare "{}" and "{}", according to '
|
|
'our logic they are neither greater, equal nor '
|
|
'less than each other.'.format(a, b))
|
|
|
|
|
|
class PkgVersion():
|
|
"""Allow package versions to be compared.
|
|
|
|
For example::
|
|
|
|
>>> import charmhelpers.fetch as fetch
|
|
>>> (fetch.apt_pkg.PkgVersion('2:20.4.0') <
|
|
... fetch.apt_pkg.PkgVersion('2:20.5.0'))
|
|
True
|
|
>>> pkgs = [fetch.apt_pkg.PkgVersion('2:20.4.0'),
|
|
... fetch.apt_pkg.PkgVersion('2:21.4.0'),
|
|
... fetch.apt_pkg.PkgVersion('2:17.4.0')]
|
|
>>> pkgs.sort()
|
|
>>> pkgs
|
|
[2:17.4.0, 2:20.4.0, 2:21.4.0]
|
|
"""
|
|
|
|
def __init__(self, version):
|
|
self.version = version
|
|
|
|
def __lt__(self, other):
|
|
return version_compare(self.version, other.version) == -1
|
|
|
|
def __le__(self, other):
|
|
return self.__lt__(other) or self.__eq__(other)
|
|
|
|
def __gt__(self, other):
|
|
return version_compare(self.version, other.version) == 1
|
|
|
|
def __ge__(self, other):
|
|
return self.__gt__(other) or self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
return version_compare(self.version, other.version) == 0
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __repr__(self):
|
|
return self.version
|
|
|
|
def __hash__(self):
|
|
return hash(repr(self))
|