charm-percona-cluster/charmhelpers/contrib/storage/linux/lvm.py

183 lines
5.5 KiB
Python

# Copyright 2014-2015 Canonical Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from subprocess import (
CalledProcessError,
check_call,
check_output,
Popen,
PIPE,
)
##################################################
# LVM helpers.
##################################################
def deactivate_lvm_volume_group(block_device):
'''
Deactivate any volume gruop associated with an LVM physical volume.
:param block_device: str: Full path to LVM physical volume
'''
vg = list_lvm_volume_group(block_device)
if vg:
cmd = ['vgchange', '-an', vg]
check_call(cmd)
def is_lvm_physical_volume(block_device):
'''
Determine whether a block device is initialized as an LVM PV.
:param block_device: str: Full path of block device to inspect.
:returns: boolean: True if block device is a PV, False if not.
'''
try:
check_output(['pvdisplay', block_device])
return True
except CalledProcessError:
return False
def remove_lvm_physical_volume(block_device):
'''
Remove LVM PV signatures from a given block device.
:param block_device: str: Full path of block device to scrub.
'''
p = Popen(['pvremove', '-ff', block_device],
stdin=PIPE)
p.communicate(input='y\n')
def list_lvm_volume_group(block_device):
'''
List LVM volume group associated with a given block device.
Assumes block device is a valid LVM PV.
:param block_device: str: Full path of block device to inspect.
:returns: str: Name of volume group associated with block device or None
'''
vg = None
pvd = check_output(['pvdisplay', block_device]).splitlines()
for lvm in pvd:
lvm = lvm.decode('UTF-8')
if lvm.strip().startswith('VG Name'):
vg = ' '.join(lvm.strip().split()[2:])
return vg
def create_lvm_physical_volume(block_device):
'''
Initialize a block device as an LVM physical volume.
:param block_device: str: Full path of block device to initialize.
'''
check_call(['pvcreate', block_device])
def create_lvm_volume_group(volume_group, block_device):
'''
Create an LVM volume group backed by a given block device.
Assumes block device has already been initialized as an LVM PV.
:param volume_group: str: Name of volume group to create.
:block_device: str: Full path of PV-initialized block device.
'''
check_call(['vgcreate', volume_group, block_device])
def list_logical_volumes(select_criteria=None, path_mode=False):
'''
List logical volumes
:param select_criteria: str: Limit list to those volumes matching this
criteria (see 'lvs -S help' for more details)
:param path_mode: bool: return logical volume name in 'vg/lv' format, this
format is required for some commands like lvextend
:returns: [str]: List of logical volumes
'''
lv_diplay_attr = 'lv_name'
if path_mode:
# Parsing output logic relies on the column order
lv_diplay_attr = 'vg_name,' + lv_diplay_attr
cmd = ['lvs', '--options', lv_diplay_attr, '--noheadings']
if select_criteria:
cmd.extend(['--select', select_criteria])
lvs = []
for lv in check_output(cmd).decode('UTF-8').splitlines():
if not lv:
continue
if path_mode:
lvs.append('/'.join(lv.strip().split()))
else:
lvs.append(lv.strip())
return lvs
list_thin_logical_volume_pools = functools.partial(
list_logical_volumes,
select_criteria='lv_attr =~ ^t')
list_thin_logical_volumes = functools.partial(
list_logical_volumes,
select_criteria='lv_attr =~ ^V')
def extend_logical_volume_by_device(lv_name, block_device):
'''
Extends the size of logical volume lv_name by the amount of free space on
physical volume block_device.
:param lv_name: str: name of logical volume to be extended (vg/lv format)
:param block_device: str: name of block_device to be allocated to lv_name
'''
cmd = ['lvextend', lv_name, block_device]
check_call(cmd)
def create_logical_volume(lv_name, volume_group, size=None):
'''
Create a new logical volume in an existing volume group
:param lv_name: str: name of logical volume to be created.
:param volume_group: str: Name of volume group to use for the new volume.
:param size: str: Size of logical volume to create (100% if not supplied)
:raises subprocess.CalledProcessError: in the event that the lvcreate fails.
'''
if size:
check_call([
'lvcreate',
'--yes',
'-L',
'{}'.format(size),
'-n', lv_name, volume_group
])
# create the lv with all the space available, this is needed because the
# system call is different for LVM
else:
check_call([
'lvcreate',
'--yes',
'-l',
'100%FREE',
'-n', lv_name, volume_group
])