James Page 309d6b0df6 Resync charmhelpers for py3 fixes.
Change-Id: I30dbafafaf4704783b6d451c7230469e79e5bf4a
2017-11-13 12:30:09 +00:00

151 lines
4.2 KiB
Python

# Copyright 2014-2017 Canonical Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Charm helpers snap for classic charms.
If writing reactive charms, use the snap layer:
https://lists.ubuntu.com/archives/snapcraft/2016-September/001114.html
"""
import subprocess
import os
from time import sleep
from charmhelpers.core.hookenv import log
__author__ = 'Joseph Borg <joseph.borg@canonical.com>'
# The return code for "couldn't acquire lock" in Snap
# (hopefully this will be improved).
SNAP_NO_LOCK = 1
SNAP_NO_LOCK_RETRY_DELAY = 10 # Wait X seconds between Snap lock checks.
SNAP_NO_LOCK_RETRY_COUNT = 30 # Retry to acquire the lock X times.
SNAP_CHANNELS = [
'edge',
'beta',
'candidate',
'stable',
]
class CouldNotAcquireLockException(Exception):
pass
class InvalidSnapChannel(Exception):
pass
def _snap_exec(commands):
"""
Execute snap commands.
:param commands: List commands
:return: Integer exit code
"""
assert type(commands) == list
retry_count = 0
return_code = None
while return_code is None or return_code == SNAP_NO_LOCK:
try:
return_code = subprocess.check_call(['snap'] + commands,
env=os.environ)
except subprocess.CalledProcessError as e:
retry_count += + 1
if retry_count > SNAP_NO_LOCK_RETRY_COUNT:
raise CouldNotAcquireLockException(
'Could not aquire lock after {} attempts'
.format(SNAP_NO_LOCK_RETRY_COUNT))
return_code = e.returncode
log('Snap failed to acquire lock, trying again in {} seconds.'
.format(SNAP_NO_LOCK_RETRY_DELAY, level='WARN'))
sleep(SNAP_NO_LOCK_RETRY_DELAY)
return return_code
def snap_install(packages, *flags):
"""
Install a snap package.
:param packages: String or List String package name
:param flags: List String flags to pass to install command
:return: Integer return code from snap
"""
if type(packages) is not list:
packages = [packages]
flags = list(flags)
message = 'Installing snap(s) "%s"' % ', '.join(packages)
if flags:
message += ' with option(s) "%s"' % ', '.join(flags)
log(message, level='INFO')
return _snap_exec(['install'] + flags + packages)
def snap_remove(packages, *flags):
"""
Remove a snap package.
:param packages: String or List String package name
:param flags: List String flags to pass to remove command
:return: Integer return code from snap
"""
if type(packages) is not list:
packages = [packages]
flags = list(flags)
message = 'Removing snap(s) "%s"' % ', '.join(packages)
if flags:
message += ' with options "%s"' % ', '.join(flags)
log(message, level='INFO')
return _snap_exec(['remove'] + flags + packages)
def snap_refresh(packages, *flags):
"""
Refresh / Update snap package.
:param packages: String or List String package name
:param flags: List String flags to pass to refresh command
:return: Integer return code from snap
"""
if type(packages) is not list:
packages = [packages]
flags = list(flags)
message = 'Refreshing snap(s) "%s"' % ', '.join(packages)
if flags:
message += ' with options "%s"' % ', '.join(flags)
log(message, level='INFO')
return _snap_exec(['refresh'] + flags + packages)
def valid_snap_channel(channel):
""" Validate snap channel exists
:raises InvalidSnapChannel: When channel does not exist
:return: Boolean
"""
if channel.lower() in SNAP_CHANNELS:
return True
else:
raise InvalidSnapChannel("Invalid Snap Channel: {}".format(channel))