Fuxi code for providing Cinder volume to Container

Provide create, rm, delete, list, get, mount and unmount Cinder volume
for Docker Container

Change-Id: I5f37b4a0781ebc21ef054dbcfa5a822404fc522d
This commit is contained in:
zhangni 2016-05-26 15:45:43 +08:00
parent e986576ed2
commit 7fdade1c5a
44 changed files with 3262 additions and 37 deletions

View File

@ -22,7 +22,6 @@ sys.path.insert(0, os.path.abspath('../..'))
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
'sphinx.ext.autodoc',
#'sphinx.ext.intersphinx',
'oslosphinx'
]
@ -72,4 +71,4 @@ latex_documents = [
]
# Example configuration for intersphinx: refer to the Python standard library.
#intersphinx_mapping = {'http://docs.python.org/': None}
# intersphinx_mapping = {'http://docs.python.org/': None}

View File

@ -0,0 +1,4 @@
[DEFAULT]
output_file = etc/fuxi.conf.sample
wrap_width = 79
namespace = fuxi

23
etc/fuxi.conf Normal file
View File

@ -0,0 +1,23 @@
[DEFAULT]
fuxi_port = 7879
my_ip =
volume_providers = cinder
volume_from = fuxi
default_volume_size = 1
volume_dir = /fuxi/data
threaded = true
debug = False
[keystone]
region =
auth_url =
admin_user =
admin_password =
admin_tenant_name =
admin_token =
auth_insecure = true
[cinder]
volume_connector = osbrick
multiattach = false
fstype = ext4

4
etc/fuxi.json Normal file
View File

@ -0,0 +1,4 @@
{
"Name": "fuxi",
"Addr": "http://127.0.0.1:7879"
}

27
etc/rootwrap.conf Normal file
View File

@ -0,0 +1,27 @@
# Configuration for fuxi-rootwrap
# This file should be owned by (and only-writeable by) the root user
[DEFAULT]
# List of directories to load filter definitions from (separated by ',').
# These directories MUST all be only writeable by root !
filters_path=/etc/fuxi/rootwrap.d
# List of directories to search executables in, in case filters do not
# explicitely specify a full path (separated by ',')
# If not specified, defaults to system PATH environment variable.
# These directories MUST all be only writeable by root !
exec_dirs=/sbin,/usr/sbin,/bin,/usr/bin,/usr/local/bin,/usr/local/sbin
# Enable logging to syslog
# Default value is False
use_syslog=False
# Which syslog facility to use.
# Valid values include auth, authpriv, syslog, local0, local1...
# Default value is 'syslog'
syslog_log_facility=syslog
# Which messages to log.
# INFO means log all usage
# ERROR means only log unsuccessful attempts
syslog_log_level=ERROR

View File

@ -0,0 +1,31 @@
# fuxi-rootwrap command filters
# This file should be owned by (and only-writeable by) the root user
[Filters]
# os-brick library commands
# os_brick.privileged.run_as_root oslo.privsep context
# This line ties the superuser privs with the config files, context name,
# and (implicitly) the actual python code invoked.
privsep-rootwrap: RegExpFilter, privsep-helper, root, privsep-helper, --config-file, /etc/(?!\.\.).*, --privsep_context, os_brick.privileged.default, --privsep_sock_path, /tmp/.*
# The following and any cinder/brick/* entries should all be obsoleted
# by privsep, and may be removed once the os-brick version requirement
# is updated appropriately.
scsi_id: CommandFilter, /lib/udev/scsi_id, root
drbdadm: CommandFilter, drbdadm, root
iscsiadm: CommandFilter, iscsiadm, root
sg_scan: CommandFilter, sg_scan, root
systool: CommandFilter, systool, root
cat: CommandFilter, cat, root
# fuxi/connector/cloudconnector/openstack.py
ln: CommandFilter, ln, root
# fuxi/blockdevice.py
mount: CommandFilter, mount, root
umount: CommandFilter, umount, root
mkfs: CommandFilter, mkfs, root
mkdir: CommandFilter, mkdir, root
tee: CommandFilter, tee, root
ls: CommandFilter, ls, root
rm: CommandFilter, rm, root

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import pbr.version
from fuxi import utils
__version__ = pbr.version.VersionInfo(
'fuxi').version_string()
app = utils.make_json_app(__name__)

0
fuxi/common/__init__.py Normal file
View File

View File

@ -0,0 +1,36 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import glob
from oslo_log import log as logging
from oslo_utils import units
from fuxi import exceptions
from fuxi.i18n import _LE
LOG = logging.getLogger(__name__)
class BlockerDeviceManager(object):
def device_scan(self):
return glob.glob('/sys/block/*')
def get_device_size(self, device):
try:
nr_sectors = open(device + '/size').read().rstrip('\n')
sect_size = open(device + '/queue/hw_sector_size')\
.read().rstrip('\n')
return (float(nr_sectors) * float(sect_size)) / units.Gi
except IOError as e:
LOG.error(_LE("Failed to read device size. {0}").format(e))
raise exceptions.FuxiException(e.message)

108
fuxi/common/config.py Normal file
View File

@ -0,0 +1,108 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_config import cfg
from oslo_log import log as logging
from fuxi import i18n
from fuxi.version import version_info
_ = i18n._
default_opts = [
cfg.StrOpt('my_ip',
help=_('IP address of this machine.')),
cfg.IntOpt('fuxi_port',
default=7879,
help=_('Port for fuxi volume driver server.')),
cfg.StrOpt('volume_dir',
default='/fuxi/data',
help=_('At which the docker volume will create.')),
cfg.ListOpt('volume_providers',
help=_('Volume storage backends that provide volume for '
'Docker')),
cfg.StrOpt('volume_from',
default='fuxi',
help=_('Setting label for volume.')),
cfg.IntOpt('default_volume_size',
default=1,
help=_('Default size for volume.')),
cfg.BoolOpt('threaded',
default=True,
help=_('Make this volume plugin run in multi-thread.')),
cfg.StrOpt('rootwrap_config',
default='/etc/fuxi/rootwrap.conf'),
]
keystone_opts = [
cfg.StrOpt('region',
default=os.environ.get('REGION'),
help=_('The region that this machine belongs to.')),
cfg.StrOpt('auth_url',
default=os.environ.get('IDENTITY_URL'),
help=_('The URL for accessing the identity service.')),
cfg.StrOpt('admin_user',
default=os.environ.get('SERVICE_USER'),
help=_('The username to auth with the identity service.')),
cfg.StrOpt('admin_tenant_name',
default=os.environ.get('SERVICE_TENANT_NAME'),
help=_('The tenant name to auth with the identity service.')),
cfg.StrOpt('admin_password',
default=os.environ.get('SERVICE_PASSWORD'),
help=_('The password to auth with the identity service.')),
cfg.StrOpt('admin_token',
default=os.environ.get('SERVICE_TOKEN'),
help=_('The admin token.')),
cfg.StrOpt('auth_ca_cert',
default=os.environ.get('SERVICE_CA_CERT'),
help=_('The CA certification file.')),
cfg.BoolOpt('auth_insecure',
default=True,
help=_("Turn off verification of the certificate for ssl.")),
]
cinder_opts = [
cfg.StrOpt('volume_connector',
default='osbrick',
help=_('Volume connector for attach volume to this server, '
'or detach volume from this server.')),
cfg.StrOpt('availability_zone',
default=None,
help=_('AZ in which the current machine creates, '
'and volume is going to create.')),
cfg.StrOpt('volume_type',
default=None,
help=_('Volume type to create volume.')),
cfg.StrOpt('fstype',
default='ext4',
help=_('Default filesystem type for volume.')),
cfg.BoolOpt('multiattach',
default=False,
help=_('Allow the volume to be attached to more than '
'one instance.'))
]
CONF = cfg.CONF
CONF.register_opts(default_opts)
CONF.register_opts(keystone_opts, group='keystone')
CONF.register_opts(cinder_opts, group='cinder')
# Setting oslo.log options for logging.
logging.register_options(CONF)
def init(args, **kwargs):
cfg.CONF(args=args, project='fuxi',
version=version_info.release_string(), **kwargs)

44
fuxi/common/constants.py Normal file
View File

@ -0,0 +1,44 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
VOLUME_FROM = 'volume_from'
DOCKER_VOLUME_NAME = 'docker_volume_name'
# Volume states
UNKNOWN = 'unknown'
NOT_ATTACH = 'not_attach'
ATTACH_TO_THIS = 'attach_to_this'
ATTACH_TO_OTHER = 'attach_to_other'
# If volume_provider is cinder, and if cinder volume is attached to this server
# by Nova, an link file will create under this directory to match attached
# volume. Of course, creating link file will decrease interact time
# with backend providers in some cases.
VOLUME_LINK_DIR = '/dev/disk/by-id/'
# Volume scanning interval
VOLUME_SCAN_TIME_DELAY = 0.3
# Timeout for destroying volume from backend provider
DESTROY_VOLUME_TIMEOUT = 300
# Timeout for monitoring volume status
MONITOR_STATE_TIMEOUT = 600
# Device scan interval
DEVICE_SCAN_TIME_DELAY = 0.3
# Timeout for scanning device
DEVICE_SCAN_TIMEOUT = 10
# Timeout for querying meta-data from localhost
CURL_MD_TIMEOUT = 10

152
fuxi/common/mount.py Normal file
View File

@ -0,0 +1,152 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuxi import exceptions
from fuxi.i18n import _
from fuxi import utils
from oslo_concurrency import processutils
from oslo_log import log as logging
from oslo_utils import excutils
proc_mounts_path = '/proc/mounts'
LOG = logging.getLogger(__name__)
class MountInfo(object):
def __init__(self, device, mountpoint, fstype, opts):
self.device = device
self.mountpoint = mountpoint
self.fstype = fstype
self.opts = opts
def __repr__(self, *args, **kwargs):
return str(self.__dict__)
class Mounter(object):
def make_filesystem(self, devpath, fstype):
try:
utils.execute('mkfs', '-t', fstype, '-F', devpath,
run_as_root=True)
except processutils.ProcessExecutionError as e:
msg = _("Unexpected error while make filesystem. "
"Devpath: {0}, "
"Fstype: {1}"
"Error: {2}").format(devpath, fstype, e)
raise exceptions.MakeFileSystemException(msg)
def mount(self, devpath, mountpoint, fstype=None):
try:
if fstype:
utils.execute('mount', '-t', fstype, devpath, mountpoint,
run_as_root=True)
else:
utils.execute('mount', devpath, mountpoint,
run_as_root=True)
except processutils.ProcessExecutionError as e:
msg = _("Unexpected error while mount block device. "
"Devpath: {0}, "
"Mountpoint: {1} "
"Error: {2}").format(devpath, mountpoint, e)
raise exceptions.MountException(msg)
def unmount(self, mountpoint):
try:
utils.execute('umount', mountpoint, run_as_root=True)
except processutils.ProcessExecutionError as e:
msg = _("Unexpected err while unmount block device. "
"Mountpoint: {0}, "
"Error: {1}").format(mountpoint, e)
raise exceptions.UnmountException(msg)
def read_mounts(self, filter_device=(), filter_fstype=()):
"""Read all mounted filesystems.
Read all mounted filesystems except filtered option.
:param filter_device: Filter for device, the result will not contain
the mounts whose device argument in it.
:param filter_fstype: Filter for mount point.
:return: All mounts.
"""
try:
(out, err) = processutils.execute('cat', proc_mounts_path,
check_exit_code=0)
except processutils.ProcessExecutionError:
msg = _("Failed to read mounts.")
raise exceptions.FileNotFound(msg)
lines = out.split('\n')
mounts = []
for line in lines:
if not line:
continue
tokens = line.split()
if len(tokens) < 4:
continue
if tokens[0] in filter_device or tokens[1] in filter_fstype:
continue
mounts.append(MountInfo(device=tokens[0], mountpoint=tokens[1],
fstype=tokens[2], opts=tokens[3]))
return mounts
def get_mps_by_device(self, devpath):
"""Get all mountpoints that device mounted on.
:param devpath: The path of mount device.
:return: All mountpoints.
:rtype: list
"""
mps = []
mounts = self.read_mounts()
for m in mounts:
if devpath == m.device:
mps.append(m.mountpoint)
return mps
def check_already_mounted(devpath, mountpoint):
"""Check that the mount device is mounted on the specific mount point.
:param devpath: The path of mount deivce.
:param mountpoint: The path of mount point.
:rtype: bool
"""
mounts = Mounter().read_mounts()
for m in mounts:
if devpath == m.device and mountpoint == m.mountpoint:
return True
return False
def do_mount(devpath, mountpoint, fstype):
"""Execute device mount operation.
:param devpath: The path of mount device.
:param mountpoint: The path of mount point.
:param fstype: The file system type.
"""
try:
if check_already_mounted(devpath, mountpoint):
return
mounter = Mounter()
mounter.mount(devpath, mountpoint, fstype)
except exceptions.MountException:
try:
mounter.make_filesystem(devpath, fstype)
mounter.mount(devpath, mountpoint, fstype)
except exceptions.FuxiException as e:
with excutils.save_and_reraise_exception():
LOG.error(e.message)

View File

@ -0,0 +1,84 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from fuxi.common import constants
from fuxi import exceptions
from fuxi.i18n import _LE
from cinderclient import exceptions as cinder_exception
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class StateMonitor(object):
"""Monitor the status of Volume.
Because of some volume operation is asynchronous, such as creating Cinder
volume, this volume could be used for next stop util reached an desired
state.
"""
def __init__(self, client, expected_obj,
desired_state,
transient_states=(),
time_limit=constants.MONITOR_STATE_TIMEOUT,
time_delay=1):
self.client = client
self.expected_obj = expected_obj
self.desired_state = desired_state
self.transient_states = transient_states
self.time_limit = time_limit
self.start_time = time.time()
self.time_delay = time_delay
def _reached_desired_state(self, current_state):
if current_state == self.desired_state:
return True
elif current_state in self.transient_states:
idx = self.transient_states.index(current_state)
if idx > 0:
self.transient_states = self.transient_states[idx:]
return False
else:
msg = _LE("Unexpected state while waiting for volume. "
"Expected Volume: {0}, "
"Expected State: {1}, "
"Reached State: {2}").format(self.expected_obj,
self.desired_state,
current_state)
LOG.error(msg)
raise exceptions.UnexpectedStateException(msg)
def monitor_cinder_volume(self):
while True:
try:
volume = self.client.volumes.get(self.expected_obj.id)
except cinder_exception.ClientException:
elapsed_time = time.time() - self.start_time
if elapsed_time > self.time_limit:
msg = _LE("Timed out while waiting for volume. "
"Expected Volume: {0}, "
"Expected State: {1}, "
"Elapsed Time: {2}").format(self.expected_obj,
self.desired_state,
elapsed_time)
LOG.error(msg)
raise exceptions.TimeoutException(msg)
raise
if self._reached_desired_state(volume.status):
return volume
time.sleep(self.time_delay)

View File

View File

@ -0,0 +1,147 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from cinderclient import exceptions as cinder_exception
from novaclient import exceptions as nova_exception
from oslo_concurrency import lockutils
from oslo_concurrency import processutils
from oslo_log import log as logging
from fuxi.common import blockdevice
from fuxi.common import config
from fuxi.common import constants as consts
from fuxi.common import state_monitor
from fuxi.connector import connector
from fuxi import exceptions
from fuxi.i18n import _, _LI, _LW, _LE
from fuxi import utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
class CinderConnector(connector.Connector):
def __init__(self):
super(CinderConnector, self).__init__()
self.cinderclient = utils.get_cinderclient()
self.novaclient = utils.get_novaclient()
@lockutils.synchronized('openstack-attach-volume')
def connect_volume(self, volume, **connect_opts):
bdm = blockdevice.BlockerDeviceManager()
ori_devices = bdm.device_scan()
# Do volume-attach
try:
server_id = connect_opts.get('server_id', None)
if not server_id:
server_id = utils.get_instance_uuid()
LOG.info(_LI("Start to connect to volume {0}").format(volume))
nova_volume = self.novaclient.volumes.create_server_volume(
server_id=server_id,
volume_id=volume.id,
device=None)
volume_monitor = state_monitor.StateMonitor(
self.cinderclient,
nova_volume,
'in-use',
('available', 'attaching',))
attached_volume = volume_monitor.monitor_cinder_volume()
except nova_exception.ClientException as ex:
LOG.error(_LE("Attaching volume {0} to server {1} "
"failed. Error: {2}").format(volume.id,
server_id, ex))
raise
# Get all devices on host after do volume-attach,
# and then find attached device.
LOG.info(_LI("After connected to volume, scan the added "
"block device on host"))
curr_devices = bdm.device_scan()
start_time = time.time()
delta_devices = list(set(curr_devices) - set(ori_devices))
while not delta_devices:
time.sleep(consts.DEVICE_SCAN_TIME_DELAY)
curr_devices = bdm.device_scan()
delta_devices = list(set(curr_devices) - set(ori_devices))
if time.time() - start_time > consts.DEVICE_SCAN_TIMEOUT:
msg = _("Could not detect added device with "
"limited time")
raise exceptions.FuxiException(msg)
LOG.info(_LI("Get extra added block device {0}"
"").format(delta_devices))
for device in delta_devices:
if bdm.get_device_size(device) == volume.size:
device = device.replace('/sys/block', '/dev')
msg = _LI("Find attached device {0} for volume {1} "
"{2}").format(device,
attached_volume.name,
volume)
LOG.info(msg)
link_path = os.path.join(consts.VOLUME_LINK_DIR, volume.id)
try:
utils.execute('ln', '-s', device,
link_path,
run_as_root=True)
except processutils.ProcessExecutionError as e:
msg = _LE("Error happened when create link file for "
"block device attached by Nova. "
"Error: {0}").format(e)
LOG.error(msg)
raise
return {'path': link_path}
LOG.warm(_LW("Could not find matched device"))
raise exceptions.NotFound("Not Found Matched Device")
def disconnect_volume(self, volume, **disconnect_opts):
try:
volume = self.cinderclient.volumes.get(volume.id)
except cinder_exception.ClientException as e:
msg = _LE("Get Volume {0} from Cinder failed").format(volume.id)
LOG.error(msg)
raise
try:
link_path = self.get_device_path(volume)
utils.execute('rm', '-f', link_path, run_as_root=True)
except processutils.ProcessExecutionError as e:
msg = _LE("Error happened when remove docker volume "
"mountpoint directory. Error: {0}").format(e)
LOG.warn(msg)
try:
self.novaclient.volumes.delete_server_volume(
utils.get_instance_uuid(),
volume.id)
except nova_exception.ClientException as e:
msg = _LE("Detaching volume {0} failed. "
"Err: {1}").format(volume.id, e)
LOG.error(msg)
raise
volume_monitor = state_monitor.StateMonitor(self.cinderclient,
volume,
'available',
('in-use', 'detaching',))
return volume_monitor.monitor_cinder_volume()
def get_device_path(self, volume):
return os.path.join(consts.VOLUME_LINK_DIR, volume.id)

View File

@ -0,0 +1,35 @@
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Connector(object):
def __init__(self):
pass
@abc.abstractmethod
def connect_volume(self, volume, **connect_opts):
pass
@abc.abstractmethod
def disconnect_volume(self, volume, **disconnect_opts):
pass
@abc.abstractmethod
def get_device_path(self, volume):
pass

View File

@ -0,0 +1,174 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from os_brick.initiator import connector
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from fuxi.common import constants as consts
from fuxi.connector import connector as fuxi_connector
from fuxi.i18n import _LI, _LE
from fuxi import utils
from cinderclient import exceptions as cinder_exception
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def brick_get_connector_properties(multipath=False, enforce_multipath=False):
"""Wrapper to automatically set root_helper in brick calls.
:param multipath: A boolean indicating whether the connector can
support multipath.
:param enforce_multipath: If True, it raises exception when multipath=True
is specified but multipathd is not running.
If False, it falls back to multipath=False
when multipathd is not running.
"""
root_helper = utils.get_root_helper()
return connector.get_connector_properties(root_helper,
CONF.my_ip,
multipath,
enforce_multipath)
def brick_get_connector(protocol, driver=None,
execute=processutils.execute,
use_multipath=False,
device_scan_attempts=3,
*args, **kwargs):
"""Wrapper to get a brick connector object.
This automatically populates the required protocol as well
as the root_helper needed to execute commands.
"""
root_helper = utils.get_root_helper()
return connector.InitiatorConnector.factory(
protocol, root_helper,
driver=driver,
execute=execute,
use_multipath=use_multipath,
device_scan_attempts=device_scan_attempts,
*args, **kwargs)
class CinderConnector(fuxi_connector.Connector):
def __init__(self):
super(CinderConnector, self).__init__()
self.cinderclient = utils.get_cinderclient()
def _get_connection_info(self, volume_id):
LOG.info(_LI("Get connection info for osbrick connector and use it to "
"connect to volume"))
try:
conn_info = self.cinderclient.volumes.initialize_connection(
volume_id,
brick_get_connector_properties())
msg = _LI("Get connection information {0}").format(conn_info)
LOG.info(msg)
return conn_info
except cinder_exception.ClientException as e:
msg = _LE("Error happened when initialize connection for volume. "
"Error: {0}").format(e)
LOG.error(msg)
raise
def _connect_volume(self, volume):
conn_info = self._get_connection_info(volume.id)
protocol = conn_info['driver_volume_type']
brick_connector = brick_get_connector(protocol)
device_info = brick_connector.connect_volume(conn_info['data'])
LOG.info(_LI("Get device_info after connect to "
"volume %s") % device_info)
try:
link_path = os.path.join(consts.VOLUME_LINK_DIR, volume.id)
utils.execute('ln', '-s', os.path.realpath(device_info['path']),
link_path,
run_as_root=True)
except processutils.ProcessExecutionError as e:
LOG.error(_LE("Failed to create link for device. %s"), e)
raise
return {'path': link_path, 'iscsi_path': device_info['path']}
def _disconnect_volume(self, volume):
try:
link_path = self.get_device_path(volume)
utils.execute('rm', '-f', link_path, run_as_root=True)
except processutils.ProcessExecutionError as e:
msg = _LE("Error happened when remove docker volume "
"mountpoint directory. Error: {0}").format(e)
LOG.warn(msg)
conn_info = self._get_connection_info(volume.id)
protocol = conn_info['driver_volume_type']
brick_get_connector(protocol).disconnect_volume(conn_info['data'],
None)
def connect_volume(self, volume, **connect_opts):
mountpoint = connect_opts.get('mountpoint', None)
host_name = utils.get_hostname()
try:
self.cinderclient.volumes.reserve(volume)
except cinder_exception.ClientException:
LOG.error(_LE("Reserve volume %s failed"), volume)
raise
try:
device_info = self._connect_volume(volume)
self.cinderclient.volumes.attach(volume=volume,
instance_uuid=None,
mountpoint=mountpoint,
host_name=host_name)
LOG.info(_LI("Attach volume to this server successfully"))
except Exception:
LOG.error(_LE("Attach volume %s to this server failed"), volume)
with excutils.save_and_reraise_exception():
try:
self._disconnect_volume(volume)
except Exception:
pass
self.cinderclient.volumes.unreserve(volume)
return device_info
def disconnect_volume(self, volume, **disconnect_opts):
self._disconnect_volume(volume)
attachments = volume.attachments
attachment_uuid = None
for am in attachments:
if am['host_name'].lower() == utils.get_hostname().lower():
attachment_uuid = am['attachment_id']
break
try:
self.cinderclient.volumes.detach(volume.id,
attachment_uuid=attachment_uuid)
LOG.info(_LI("Disconnect volume successfully"))
except cinder_exception.ClientException as e:
msg = _LE("Error happened when detach volume {0} {1} from this "
"server. Error: {2}").format(volume.name, volume, e)
LOG.error(msg)
raise
def get_device_path(self, volume):
return os.path.join(consts.VOLUME_LINK_DIR, volume.id)

227
fuxi/controllers.py Normal file
View File

@ -0,0 +1,227 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import flask
import os
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import importutils
from fuxi import app
from fuxi import exceptions
from fuxi.i18n import _, _LI, _LW
from fuxi import utils
CONF = cfg.CONF
CINDER = 'cinder'
volume_providers_conf = {
CINDER: 'fuxi.volumeprovider.cinder.Cinder', }
def init_app_conf():
# Init volume providers.
volume_providers = CONF.volume_providers
if not volume_providers:
raise Exception("Must define volume providers in configuration file")
app.volume_providers = collections.OrderedDict()
for provider in volume_providers:
if provider in volume_providers_conf:
app.volume_providers[provider] = importutils\
.import_class(volume_providers_conf[provider])()
app.logger.info(_LI("Load volume provider: {0}").format(provider))
else:
msg = _LW("Could not find volume provider: {0}").format(provider)
app.logger.warn(msg)
if not app.volume_providers:
raise Exception("Not provide at least one effective volume provider")
# Init volume store directory.
try:
volume_dir = CONF.volume_dir
if not os.path.exists(volume_dir) or not os.path.isdir(volume_dir):
utils.execute('mkdir', '-p', '-m=700', volume_dir,
run_as_root=True)
except processutils.ProcessExecutionError:
raise
def get_docker_volume(docker_volume_name):
for provider in app.volume_providers.values():
try:
return provider.show(docker_volume_name)
except exceptions.NotFound:
pass
return None
@app.route('/Plugin.Activate', methods=['POST'])
def plugin_activate():
app.logger.info(_LI("/Plugin.Activate"))
return flask.jsonify(Implements=[u'VolumeDriver'])
@app.route('/VolumeDriver.Create', methods=['POST'])
def volumedriver_create():
json_data = flask.request.get_json(force=True)
app.logger.info(_LI("Received JSON data {0} for "
"/VolumeDriver.Create").format(json_data))
docker_volume_name = json_data.get('Name', None)
volume_opts = json_data.get('Opts', None) or {}
if not docker_volume_name:
msg = _("Request /VolumeDriver.Create need parameter 'Name'")
app.logger.error(msg)
raise exceptions.InvalidInput(msg)
if not isinstance(volume_opts, dict):
msg = _("Request parameter 'Opts' must be dict type")
app.logger.error(msg)
raise exceptions.InvalidInput(msg)
volume_provider_type = volume_opts.get('volume_provider', None)
if not volume_provider_type:
volume_provider_type = app.volume_providers.keys()[0]
if volume_provider_type not in app.volume_providers:
msg_fmt = _("Could not find a handler for %(volume_provider_type)s "
"volume") % {'volume_provider_type': volume_provider_type}
app.logger.error(msg_fmt)
return flask.jsonify(Err=msg_fmt)
# If the volume with the same name already exists in other volume
# provider backend, then raise an error
for vpt, provider in app.volume_providers.items():
if volume_provider_type != vpt \
and provider.check_exist(docker_volume_name):
msg_fmt = _("The volume with the same name already exists in "
"other volume provider backend")
app.logger.error(msg_fmt)
return flask.jsonify(Err=msg_fmt)
# Create if volume does not exist, or attach to this server if needed
# if volume exists in related volume provider.
app.volume_providers[volume_provider_type].create(docker_volume_name,
volume_opts)
return flask.jsonify(Err=u'')
@app.route('/VolumeDriver.Remove', methods=['POST'])
def volumedriver_remove():
json_data = flask.request.get_json(force=True)
app.logger.info(_LI("Received JSON data {0} for "
"/VolumeDriver.Remove").format(json_data))
docker_volume_name = json_data.get('Name', None)
if not docker_volume_name:
msg = _("Request /VolumeDriver.Remove need parameter 'Name'")
app.logger.error(msg)
raise exceptions.InvalidInput(msg)
for provider in app.volume_providers.values():
if provider.delete(docker_volume_name):
return flask.jsonify(Err=u'')
return flask.jsonify(Err=u'')
@app.route('/VolumeDriver.Mount', methods=['POST'])
def volumedriver_mount():
json_data = flask.request.get_json(force=True)
app.logger.info(_LI("Receive JSON data {0} for "
"/VolumeDriver.Mount").format(json_data))
docker_volume_name = json_data.get('Name', None)
if not docker_volume_name:
msg = _("Request /VolumeDriver.Mount need parameter 'Name'")
app.logger.error(msg)
raise exceptions.InvalidInput(msg)
for provider in app.volume_providers.values():
if provider.check_exist(docker_volume_name):
mountpoint = provider.mount(docker_volume_name)
return flask.jsonify(Mountpoint=mountpoint, Err=u'')
return flask.jsonify(Err=u'Mount Failed')
@app.route('/VolumeDriver.Path', methods=['POST'])
def volumedriver_path():
json_data = flask.request.get_json(force=True)
app.logger.info(_LI("Receive JSON data {0} for "
"/VolumeDriver.Path").format(json_data))
docker_volume_name = json_data.get('Name', None)
if not docker_volume_name:
msg = _("Request /VolumeDriver.Path need parameter 'Name'")
app.logger.error(msg)
raise exceptions.InvalidInput(msg)
volume = get_docker_volume(docker_volume_name)
if volume is not None:
mountpoint = volume.get('Mountpoint', '')
app.logger.info("Get mountpoint %(mp)s for docker volume %(name)s"
% {'mp': mountpoint, 'name': docker_volume_name})
return flask.jsonify(Mountpoint=mountpoint, Err=u'')
app.logger.warn(_LW("Can't find mountpoint for docker volume "
"%(name)s") % {'name': docker_volume_name})
return flask.jsonify(Err=u'Mountpoint Not Found')
@app.route('/VolumeDriver.Unmount', methods=['POST'])
def volumedriver_unmount():
json_data = flask.request.get_json(force=True)
app.logger.info(_LI('Receive JSON data {0} for '
'VolumeDriver.Unmount').format(json_data))
return flask.jsonify(Err=u'')
@app.route('/VolumeDriver.Get', methods=['POST'])
def volumedriver_get():
json_data = flask.request.get_json(force=True)
app.logger.info(_LI("Receive JSON data {0} for "
"/VolumeDriver.Get").format(json_data))
docker_volume_name = json_data.get('Name', None)
if not docker_volume_name:
msg = _("Request /VolumeDriver.Get need parameter 'Name'")
app.logger.error(msg)
raise exceptions.InvalidInput(msg)
volume = get_docker_volume(docker_volume_name)
if volume is not None:
msg_fmt = _LI("Get docker volume: {0}").format(volume)
app.logger.info(msg_fmt)
return flask.jsonify(Volume=volume, Err=u'')
app.logger.warn(_LW("Can't find volume {0} from every "
"provider").format(docker_volume_name))
return flask.jsonify(Err=u'Volume Not Found')
@app.route('/VolumeDriver.List', methods=['POST'])
def volumedriver_list():
app.logger.info(_LI("/VolumeDriver.List"))
docker_volumes = []
for provider in app.volume_providers.values():
vs = provider.list()
if vs:
docker_volumes.extend(vs)
app.logger.info(_LI("Get volumes from volume providers. "
"Volumes: {0}").format(docker_volumes))
return flask.jsonify(Err=u'', Volumes=docker_volumes)

60
fuxi/exceptions.py Normal file
View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class FuxiException(Exception):
"""Default Kuryr exception"""
class TimeoutException(FuxiException):
"""A timeout on waiting for volume to reach destination end state."""
class UnexpectedStateException(FuxiException):
"""Unexpected volume state appeared"""
class LoopExceeded(FuxiException):
"""Raised when ``loop_until`` looped too many times."""
class NotFound(FuxiException):
"""The resource could not be found"""
class TooManyResources(FuxiException):
"""Find too many resources."""
class InvalidInput(FuxiException):
"""Request data is invalidate"""
class NotMatchedState(FuxiException):
"""Current state not match to expected state"""
message = "Current state not match to expected state."
class MakeFileSystemException(FuxiException):
"""Unexpected error while make file system."""
class MountException(FuxiException):
"""Unexpected error while mount device."""
class UnmountException(FuxiException):
"""Unexpected error while do umount"""
class FileNotFound(FuxiException):
"""The expected file not exist"""

42
fuxi/i18n.py Normal file
View File

@ -0,0 +1,42 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_i18n
DOMAIN = "fuxi"
_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN)
# The primary translation function using the well-known name "_"
_ = _translators.primary
# The contextual translation function using the name "_C"
_C = _translators.contextual_form
# The plural translation function using the name "_P"
_P = _translators.plural_form
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
def get_available_languages():
return oslo_i18n.get_available_languages(DOMAIN)

25
fuxi/opts.py Normal file
View File

@ -0,0 +1,25 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'list_fuxi_opts',
]
import itertools
from fuxi.common import config
def list_fuxi_opts():
return [('DEFAULT', itertools.chain(config.default_opts,)),
('keystone', itertools.chain(config.keystone_opts,)),
('cinder', itertools.chain(config.cinder_opts,)), ]

31
fuxi/server.py Normal file
View File

@ -0,0 +1,31 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from fuxi import app
from fuxi.common import config
from fuxi import controllers
from oslo_log import log as logging
def start():
config.init(sys.argv[1:])
logging.setup(config.CONF, 'fuxi')
controllers.init_app_conf()
port = config.CONF.fuxi_port
app.run("0.0.0.0", port,
debug=config.CONF.debug,
threaded=config.CONF.threaded)

View File

@ -21,3 +21,5 @@ from oslotest import base
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""
def setUp(self):
super(TestCase, self).setUp()

View File

View File

@ -0,0 +1,120 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuxi.common import mount
from fuxi import exceptions
from fuxi.tests import base
class FakeMounter(object):
def __init__(self, mountinfo=None):
self.mountinfo = "/dev/0 /path/to/0 type0 flags 0 0\n" \
"/dev/1 /path/to/1 type1 flags 0 0\n" \
"/dev/2 /path/to/2 type2 flags,1,2=3 0 0\n" \
if not mountinfo else mountinfo
def mount(self, devpath, mountpoint, fstype=None):
if not fstype:
fstype = 'ext4'
self.mountinfo += ' '.join([devpath, mountpoint, fstype,
'flags', '0', '0\n'])
def unmount(self, mountpoint):
mounts = self.read_mounts()
ori_len = len(mounts)
for m in mounts:
if m.mountpoint == mountpoint:
mounts.remove(m)
if ori_len != len(mounts):
self.mountinfo = ''.join([' '.join([m.device, m.mountpoint,
m.fstype, m.opts,
'0', '0\n'])
for m in mounts])
else:
raise exceptions.UnmountException()
def read_mounts(self, filter_device=(), filter_fstype=()):
lines = self.mountinfo.split('\n')
mounts = []
for line in lines:
if not line:
continue
tokens = line.split()
if len(tokens) < 4:
continue
if tokens[0] in filter_device or tokens[1] in filter_fstype:
continue
mounts.append(mount.MountInfo(device=tokens[0],
mountpoint=tokens[1],
fstype=tokens[2], opts=tokens[3]))
return mounts
def get_mps_by_device(self, devpath):
mps = []
mounts = self.read_mounts()
for m in mounts:
if devpath in m.device:
mps.append(m.mountpoint)
return mps
def check_already_mounted(devpath, mountpoint):
mounts = FakeMounter().read_mounts()
for m in mounts:
if m.device == devpath and m.mountpoint == mountpoint:
return True
return False
class TestMounter(base.TestCase):
def setUp(self):
super(TestMounter, self).setUp()
def test_mount(self):
fake_devpath = '/dev/3'
fake_mp = '/path/to/3'
fake_fstype = 'ext4'
fake_mounter = FakeMounter()
fake_mounter.mount(fake_devpath, fake_mp, fake_fstype)
fake_mountinfo = "/dev/0 /path/to/0 type0 flags 0 0\n" \
"/dev/1 /path/to/1 type1 flags 0 0\n" \
"/dev/2 /path/to/2 type2 flags,1,2=3 0 0\n" \
"/dev/3 /path/to/3 ext4 flags 0 0\n"
self.assertEqual(fake_mountinfo, fake_mounter.mountinfo)
def test_unmount(self):
fake_mp = '/path/to/2'
fake_mounter = FakeMounter()
fake_mounter.unmount(fake_mp)
fake_mountinfo = "/dev/0 /path/to/0 type0 flags 0 0\n" \
"/dev/1 /path/to/1 type1 flags 0 0\n"
self.assertEqual(fake_mountinfo, fake_mounter.mountinfo)
def test_read_mounts(self):
fake_mounts = [str(mount.MountInfo('/dev/0', '/path/to/0',
'type0', 'flags')),
str(mount.MountInfo('/dev/1', '/path/to/1',
'type1', 'flags')),
str(mount.MountInfo('/dev/2', '/path/to/2',
'type2', 'flags,1,2=3'))]
mounts = [str(m) for m in FakeMounter().read_mounts()]
self.assertEqual(len(fake_mounts), len(mounts))
for m in mounts:
self.assertIn(m, fake_mounts)
def test_get_mps_by_device(self):
self.assertEqual(['/path/to/0'],
FakeMounter().get_mps_by_device('/dev/0'))
def test_check_alread_mounted(self):
self.assertTrue(check_already_mounted('/dev/0', '/path/to/0'))
self.assertFalse(check_already_mounted('/dev/0', '/path/to/1'))

View File

@ -0,0 +1,81 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinderclient import exceptions as cinder_exception
from fuxi.common import state_monitor
from fuxi import exceptions
from fuxi.tests import base, fake_client, fake_object
class TestStateMonitor(base.TestCase):
def setUp(self):
super(TestStateMonitor, self).setUp()
def test_monitor_cinder_volume(self):
fake_cinder_client = fake_client.FakeCinderClient()
fake_cinder_volume = fake_object.FakeCinderVolume(status='available')
fake_desired_state = 'in-use'
fake_transient_states = ('in-use',)
fake_time_limit = 0
fake_state_monitor = state_monitor.StateMonitor(fake_cinder_client,
fake_cinder_volume,
fake_desired_state,
fake_transient_states,
fake_time_limit)
fake_desired_volume = fake_object.FakeCinderVolume(status='in-use')
with mock.patch.object(fake_client.FakeCinderClient.Volumes, 'get',
return_value=fake_desired_volume):
self.assertEqual(fake_desired_volume,
fake_state_monitor.monitor_cinder_volume())
def test_monitor_cinder_volume_get_failed(self):
fake_cinder_client = fake_client.FakeCinderClient()
fake_cinder_volume = fake_object.FakeCinderVolume(status='available')
with mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes.get',
side_effect=cinder_exception.ClientException(404)):
fake_state_monitor = state_monitor.StateMonitor(fake_cinder_client,
fake_cinder_volume,
None, None, -1)
self.assertRaises(exceptions.TimeoutException,
fake_state_monitor.monitor_cinder_volume)
with mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes.get',
side_effect=cinder_exception.ClientException(404)):
fake_state_monitor = state_monitor.StateMonitor(fake_cinder_client,
fake_cinder_volume,
None, None)
self.assertRaises(cinder_exception.ClientException,
fake_state_monitor.monitor_cinder_volume)
def test_monitor_cinder_volume_unexpected_state(self):
fake_cinder_client = fake_client.FakeCinderClient()
fake_cinder_volume = fake_object.FakeCinderVolume(status='available')
fake_desired_state = 'in-use'
fake_transient_states = ('in-use',)
fake_time_limit = 0
fake_state_monitor = state_monitor.StateMonitor(fake_cinder_client,
fake_cinder_volume,
fake_desired_state,
fake_transient_states,
fake_time_limit)
fake_desired_volume = fake_object.FakeCinderVolume(status='attaching')
with mock.patch.object(fake_client.FakeCinderClient.Volumes, 'get',
return_value=fake_desired_volume):
self.assertRaises(exceptions.UnexpectedStateException,
fake_state_monitor.monitor_cinder_volume)

View File

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
from fuxi.common import constants
from fuxi.common import state_monitor
from fuxi.connector.cloudconnector import openstack
from fuxi import utils
from fuxi.tests import base, fake_client, fake_object
from cinderclient import exceptions as cinder_exception
from novaclient import exceptions as nova_exception
def mock_list_with_attach_to_this(cls, search_opts={}):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': None,
u'device': None,
u'id': u'123'}]
return [fake_object.FakeCinderVolume(name='fake-vol1',
attachments=attachments)]
def mock_list_with_attach_to_other(cls, search_opts={}):
attachments = [{u'server_id': u'1234',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': None,
u'device': None,
u'id': u'123'}]
return [fake_object.FakeCinderVolume(name='fake-vol1',
attachments=attachments)]
def mock_get_mountpoint_for_device(devpath, mountpoint):
return ''
class TestCinderConnector(base.TestCase):
def setUp(self):
base.TestCase.setUp(self)
self.connector = openstack.CinderConnector()
self.connector.cinderclient = fake_client.FakeCinderClient()
self.connector.novaclient = fake_client.FakeNovaClient()
def test_connect_volume(self):
pass
@mock.patch.object(utils, 'get_instance_uuid', return_value='fake-123')
@mock.patch.object(utils, 'execute')
@mock.patch.object(state_monitor.StateMonitor, 'monitor_cinder_volume',
return_value=None)
def test_disconnect_volume(self, mock_inst_id, mock_execute, mock_monitor):
fake_cinder_volume = fake_object.FakeCinderVolume()
result = self.connector.disconnect_volume(fake_cinder_volume)
self.assertIsNone(result)
@mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes.get',
side_effect=cinder_exception.ClientException(404))
@mock.patch.object(utils, 'execute')
@mock.patch.object(state_monitor.StateMonitor,
'monitor_cinder_volume')
def test_disconnect_volume_for_not_found(self, mock_get, mock_execute,
mocK_monitor):
fake_cinder_volume = fake_object.FakeCinderVolume()
self.assertRaises(cinder_exception.ClientException,
self.connector.disconnect_volume,
fake_cinder_volume)
@mock.patch('fuxi.tests.fake_client.FakeNovaClient.Volumes'
'.delete_server_volume',
side_effect=nova_exception.ClientException(500))
@mock.patch.object(utils, 'get_instance_uuid', return_value='fake-123')
@mock.patch.object(utils, 'execute')
@mock.patch.object(state_monitor.StateMonitor,
'monitor_cinder_volume')
def test_disconnect_volume_for_delete_server_volume_failed(self,
mock_delete,
mock_inst_id,
mock_execute,
mock_monitor):
fake_cinder_volume = fake_object.FakeCinderVolume()
self.assertRaises(nova_exception.ClientException,
self.connector.disconnect_volume,
fake_cinder_volume)
def test_get_device_path(self):
fake_cinder_volume = fake_object.FakeCinderVolume()
fake_devpath = os.path.join(constants.VOLUME_LINK_DIR,
fake_cinder_volume.id)
self.assertEqual(fake_devpath,
self.connector.get_device_path(fake_cinder_volume))

View File

@ -0,0 +1,168 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import platform
import socket
import sys
from fuxi.common import constants
from fuxi.connector import osbrickconnector
from fuxi.tests import base, fake_client, fake_object
from fuxi import utils
from cinderclient import exceptions as cinder_exception
from oslo_concurrency import processutils
def mock_get_connector_properties(multipath=False, enforce_multipath=False):
props = {}
props['host'] = socket.gethostname()
props['initiator'] = 'iqn.1993-08.org.debian:01:b57cc344932'
props['platform'] = platform.machine()
props['os_type'] = sys.platform
return props
def mock_list_with_attach_to_this(cls, search_opts={}):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
return [fake_object.FakeCinderVolume(name='fake-vol1',
attachments=attachments)]
def mock_list_with_attach_to_other(cls, search_opts={}):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname() + u'other',
u'device': None,
u'id': u'123'}]
return [fake_object.FakeCinderVolume(name='fake-vol1',
attachments=attachments)]
def mock_get_mountpoint_for_device(devpath, mountpoint):
return ''
class TestCinderConnector(base.TestCase):
def setUp(self):
base.TestCase.setUp(self)
self.connector = osbrickconnector.CinderConnector()
self.connector.cinderclient = fake_client.FakeCinderClient()
def test_connect_volume(self):
fake_cinder_volume = fake_object.FakeCinderVolume()
self.connector._connect_volume = mock.MagicMock()
self.connector.connect_volume(fake_cinder_volume)
self.assertEqual(1, len(fake_cinder_volume.attachments))
@mock.patch.object(osbrickconnector, 'brick_get_connector',
return_value=fake_client.FakeOSBrickConnector())
@mock.patch.object(utils, 'execute')
def test_disconnect_volume(self, mock_brick_connector, mock_execute):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
fake_cinder_volume = \
fake_object.FakeCinderVolume(attachments=attachments)
self.connector._get_connection_info = mock.MagicMock()
self.connector.cinderclient.volumes.detach = mock.MagicMock()
self.assertIsNone(self.connector.disconnect_volume(fake_cinder_volume))
@mock.patch.object(osbrickconnector, 'brick_get_connector_properties',
mock_get_connector_properties)
@mock.patch.object(utils, 'execute')
@mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes'
'.initialize_connection',
side_effect=cinder_exception.ClientException(500))
def test_disconnect_volume_no_connection_info(self, mock_execute,
mock_init_conn):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
fake_cinder_volume = \
fake_object.FakeCinderVolume(attachments=attachments)
self.assertRaises(cinder_exception.ClientException,
self.connector.disconnect_volume,
fake_cinder_volume)
@mock.patch.object(osbrickconnector, 'brick_get_connector',
return_value=fake_client.FakeOSBrickConnector())
@mock.patch.object(osbrickconnector.CinderConnector,
'_get_connection_info',
return_value={'driver_volume_type': 'fake_proto',
'data': {'path': '/dev/0'}})
@mock.patch.object(utils, 'execute')
@mock.patch('fuxi.tests.fake_client.FakeOSBrickConnector'
'.disconnect_volume',
side_effect=processutils.ProcessExecutionError())
def test_disconnect_volume_osbrick_disconnect_failed(self, mock_connector,
mock_init_conn,
mock_execute,
mock_disconnect_vol):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
fake_cinder_volume = \
fake_object.FakeCinderVolume(attachments=attachments)
self.assertRaises(processutils.ProcessExecutionError,
self.connector.disconnect_volume,
fake_cinder_volume)
@mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes.detach',
side_effect=cinder_exception.ClientException(500))
@mock.patch.object(osbrickconnector, 'brick_get_connector',
return_value=fake_client.FakeOSBrickConnector())
@mock.patch.object(utils, 'execute')
@mock.patch.object(osbrickconnector.CinderConnector,
'_get_connection_info',
return_value={'driver_volume_type': 'fake_proto',
'data': {'path': '/dev/0'}})
def test_disconnect_volume_detach_failed(self, mock_detach,
mock_brick_connector,
mock_execute,
mock_conn_info):
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
fake_cinder_volume = \
fake_object.FakeCinderVolume(attachments=attachments)
self.assertRaises(cinder_exception.ClientException,
self.connector.disconnect_volume,
fake_cinder_volume)
def test_get_device_path(self):
fake_cinder_volume = fake_object.FakeCinderVolume()
self.assertEqual(os.path.join(constants.VOLUME_LINK_DIR,
fake_cinder_volume.id),
self.connector.get_device_path(fake_cinder_volume))

88
fuxi/tests/fake_client.py Normal file
View File

@ -0,0 +1,88 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuxi.tests import fake_object
from cinderclient import exceptions as cinder_exception
class FakeCinderClient(object):
class Volumes(object):
def get(self, volume_id):
return fake_object.FakeCinderVolume(id=volume_id)
def list(self, search_opts={}):
return [fake_object.FakeCinderVolume(name='fake-vol1')]
def create(self, *args, **kwargs):
return fake_object.FakeCinderVolume(**kwargs)
def delete(self, volume_id):
return
def attach(self, volume, instance_uuid, mountpoint, host_name):
if not instance_uuid and not host_name:
raise cinder_exception.ClientException
attachment = {u'server_id': instance_uuid,
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': host_name,
u'device': None,
u'id': u'123'}
volume.attachments.append(attachment)
return volume
def detach(self, volume_id, attachment_uuid):
pass
def initialize_connection(self, volume, connector):
return {'data': {}}
def reserve(self, volume):
return
def update(self, volume, **kwargs):
for key, value in kwargs.items():
if hasattr(volume, key):
setattr(volume, key, value)
def set_metadata(self, volume, metadata):
md = volume.metadata
md.update(metadata)
def __getattr__(self, item):
return None
def __init__(self):
self.volumes = self.Volumes()
class FakeNovaClient(object):
class Volumes(object):
def create_server_volume(self, volume_id):
pass
def delete_server_volume(self, server_id, volume_id):
return None
def __init__(self):
self.volumes = self.Volumes()
class FakeOSBrickConnector(object):
def connect_volume(self, connection_properties):
pass
def disconnect_volume(self, connection_properties, device_info):
pass

51
fuxi/tests/fake_object.py Normal file
View File

@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
DEFAULT_VOLUME_ID = 'efd46583-4bf7-40d5-a027-2ee3dbe74f56'
DEFAULT_VOLUME_NAME = 'fake_vol'
base_cinder_volume = {
'attachments': [],
'availability_zone': 'nova',
'id': DEFAULT_VOLUME_ID,
'size': 15,
'display_name': DEFAULT_VOLUME_NAME,
'metadata': {
'readonly': 'False',
'volume_from': 'fuxi',
'fstype': 'ext4',
},
'status': 'available',
'multiattach': 'false',
'volume_type': 'lvmdriver-1',
}
class FakeCinderVolume(object):
def __init__(self, **kwargs):
if 'name' in kwargs:
kwargs['display_name'] = kwargs.pop('name')
volume = (copy.deepcopy(base_cinder_volume))
volume.update(kwargs)
for key, value in volume.items():
setattr(self, key, value)
def get_name(self):
return self.display_name
def set_name(self, name):
self.display_name = name
name = property(get_name, set_name)

View File

@ -18,11 +18,321 @@ test_fuxi
Tests for `fuxi` module.
"""
import collections
import mock
import unittest
from fuxi import app
from fuxi.common import config
from fuxi.controllers import volume_providers_conf
from fuxi import exceptions
from fuxi.tests import base
from oslo_serialization import jsonutils
def fake_mountpoint(name):
volume_dir = config.CONF.volume_dir.rstrip('/')
return ''.join((volume_dir, name))
def fake_volume(name):
volume_dir = config.CONF.volume_dir.rstrip('/')
return {'Name': name, 'Mountpoint': ''.join((volume_dir, name))}
class FakeProvider(object):
def __init__(self, volume_provider_type):
self.volume_provider_type = volume_provider_type
def create(self, docker_volume_name, volume_opts):
pass
def delete(self, docker_volume_name):
pass
def list(self):
pass
def path(self, docker_volume_name):
pass
def show(self, docker_volume_name):
pass
def mount(self, docker_volume_name):
pass
def unmount(self, docker_volume_name):
pass
def check_exist(self, docker_volume_name):
return False
class TestFuxi(base.TestCase):
def setUp(self):
super(TestFuxi, self).setUp()
app.config['DEBUG'] = True
app.config['TESTING'] = True
self.app = app.test_client()
def test_something(self):
pass
def volume_providers_setup(self, volume_provider_types):
if not volume_provider_types:
raise Exception
app.volume_providers = collections.OrderedDict()
for vpt in volume_provider_types:
if vpt in volume_providers_conf:
app.volume_providers[vpt] = FakeProvider(vpt)
def test_plugin_activate(self):
response = self.app.post('/Plugin.Activate')
fake_response = {
u'Implements': [u'VolumeDriver']
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_create(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol',
u'Opts': {u'size': u'1'},
}
for provider in app.volume_providers.values():
provider.check_exist = mock.MagicMock()
provider.check_exist.return_value = False
provider.create = mock.MagicMock()
response = self.app.post('/VolumeDriver.Create',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_create_without_name(self):
self.volume_providers_setup(['cinder'])
fake_request = {u'Opts': {}}
response = self.app.post('VolumeDriver.Create',
content_type='application/json',
data=jsonutils.dumps(fake_request))
self.assertEqual(500, response.status_code)
self.assertIsNotNone(jsonutils.loads(response.data))
def test_volumedriver_create_with_invalid_opts(self):
self.volume_providers_setup(['cinder'])
fake_request = {u'Name': u'test-vol', u'Opts': u'invalid'}
response = self.app.post('VolumeDriver.Create',
content_type='application/json',
data=jsonutils.dumps(fake_request))
self.assertEqual(500, response.status_code)
self.assertIsNotNone(jsonutils.loads(response.data))
def test_volumedriver_create_invalid_volume_provider(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol',
u'Opts': {u'size': u'1',
u'volume_provider': u'provider'}}
for provider in app.volume_providers.values():
provider.check_exist = mock.MagicMock()
provider.check_exist.return_value = False
provider.create = mock.MagicMock()
response = self.app.post('VolumeDriver.Create',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertNotEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_remove(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol'
}
for provider in app.volume_providers.values():
provider.delete = mock.MagicMock()
provider.delete.return_value = True
response = self.app.post('/VolumeDriver.Remove',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_remove_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol',
}
for provider in app.volume_providers.values():
provider.delete = mock.MagicMock()
provider.delete.return_value = False
response = self.app.post('/VolumeDriver.Remove',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_mount(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name
}
for provider in app.volume_providers.values():
provider.check_exist = mock.MagicMock()
provider.check_exist.return_value = True
provider.mount = mock.MagicMock()
provider.mount.return_value = fake_mountpoint(fake_name)
response = self.app.post('/VolumeDriver.Mount',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Mountpoint': fake_mountpoint(fake_name),
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_mount_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name,
}
for provider in app.volume_providers.values():
provider.check_exit = mock.MagicMock()
provider.check_exit.return_value = False
response = self.app.post('/VolumeDriver.Mount',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Mountpoint': fake_mountpoint(fake_name),
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertNotEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_path(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock()
provider.show.return_value = fake_volume(fake_name)
response = self.app.post('/VolumeDriver.Path',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Mountpoint': fake_mountpoint(fake_name),
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_path_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_docker_volume_name = u'test-vol'
fake_request = {
u'Name': fake_docker_volume_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock(side_effect=exceptions.NotFound)
response = self.app.post('/VolumeDriver.Path',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u'Mountpoint Not Found'
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_unmount(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol'
}
response = self.app.post('/VolumeDriver.Unmount',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_get(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock()
provider.show.return_value = fake_volume(fake_name)
response = self.app.post('/VolumeDriver.Get',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Volume': {u'Name': fake_name,
u'Mountpoint': fake_mountpoint(fake_name)},
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_get_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_docker_volume_name = u'test-vol'
fake_request = {
u'Name': fake_docker_volume_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock(side_effect=exceptions.NotFound())
response = self.app.post('/VolumeDriver.Get',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u'Volume Not Found'
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_list(self):
self.volume_providers_setup(['cinder'])
for provider in app.volume_providers.values():
provider.list = mock.MagicMock()
provider.list.return_value = []
response = self.app.post('/VolumeDriver.List',
content_type='application/json')
fake_response = {
u'Volumes': [],
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
if __name__ == '__main__':
unittest.main()

View File

View File

@ -0,0 +1,286 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from mock import mock
import os
import tempfile
from fuxi.common import config
from fuxi.common import constants as consts
from fuxi.common import mount
from fuxi import exceptions
from fuxi.tests import base, fake_client, fake_object
from fuxi import utils
from fuxi.volumeprovider import cinder
from cinderclient import exceptions as cinder_exception
volume_link_dir = consts.VOLUME_LINK_DIR
DEFAULT_VOLUME_ID = fake_object.DEFAULT_VOLUME_ID
CONF = config.CONF
LOG = logging.getLogger(__name__)
class FakeCinderConnector(object):
def __init__(self):
pass
def connect_volume(self, volume, **connect_opts):
return {'path': os.path.join(volume_link_dir, volume.id)}
def disconnect_volume(self, volume, **disconnect_opts):
pass
def get_device_path(self, volume):
return os.path.join(volume_link_dir, volume.id)
def mock_connector(cls):
return FakeCinderConnector()
def mock_monitor_cinder_volume(cls):
return cls.expected_obj
def mock_device_path_for_delete(cls, volume):
return volume.id
class TestCinder(base.TestCase):
volume_provider_type = 'cinder'
def setUp(self):
base.TestCase.setUp(self)
self.cinderprovider = cinder.Cinder()
self.cinderprovider.cinderclient = fake_client.FakeCinderClient()
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(None, consts.UNKNOWN))
def test_create_with_volume_not_exist(self, mock_docker_volume):
self.assertEqual(os.path.join(volume_link_dir, DEFAULT_VOLUME_ID),
self.cinderprovider.create('fake-vol', {})['path'])
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
def test_create_with_volume_attach_to_this(self):
fake_server_id = 'fake_server_123'
fake_host_name = 'attached_to_this'
fake_volume_args = {'id': DEFAULT_VOLUME_ID,
'status': 'in-use',
'attachments': [{'server_id': fake_server_id,
'host_name': fake_host_name}]
}
fake_cinder_volume = fake_object.FakeCinderVolume(**fake_volume_args)
self.cinderprovider._get_docker_volume = mock.MagicMock()
self.cinderprovider._get_docker_volume.return_value \
= (fake_cinder_volume,
consts.ATTACH_TO_THIS)
self.cinderprovider.cinderclient.volumes.get = mock.MagicMock()
self.cinderprovider.cinderclient.volumes.get.return_value = \
fake_cinder_volume
fake_result = self.cinderprovider.create('fake-vol', {})
self.assertEqual(os.path.join(volume_link_dir, DEFAULT_VOLUME_ID),
fake_result['path'])
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
def test_create_with_volume_no_attach(self):
fake_cinder_volume = fake_object.FakeCinderVolume()
self.cinderprovider._get_docker_volume = mock.MagicMock()
self.cinderprovider._get_docker_volume.return_value \
= (fake_cinder_volume,
consts.NOT_ATTACH)
fake_result = self.cinderprovider.create('fake-vol', {})
self.assertEqual(os.path.join(volume_link_dir, DEFAULT_VOLUME_ID),
fake_result['path'])
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(
multiattach=True), consts.ATTACH_TO_OTHER))
def test_create_with_multiable_vol_attached_to_other(self,
mock_docker_volume):
self.assertEqual(os.path.join(volume_link_dir,
fake_object.DEFAULT_VOLUME_ID),
self.cinderprovider.create('fake-vol', {})['path'])
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(
multiattach=False), consts.ATTACH_TO_OTHER))
def test_create_with_volume_attached_to_other(self, mock_docker_volume):
self.assertRaises(exceptions.FuxiException,
self.cinderprovider.create,
'fake-vol',
{})
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(utils, 'execute')
@mock.patch.object(FakeCinderConnector,
'get_device_path',
mock_device_path_for_delete)
def test_delete(self, mock_execute):
fd, tmpfname = tempfile.mkstemp()
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
self.cinderprovider._get_docker_volume = mock.MagicMock()
self.cinderprovider._get_docker_volume.return_value = (
fake_object.FakeCinderVolume(id=tmpfname,
attachments=attachments),
consts.ATTACH_TO_THIS)
self.cinderprovider._delete_volume = mock.MagicMock()
self.assertTrue(self.cinderprovider.delete('fake-vol'))
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(status=None),
None))
def test_delete_not_match_state(self, mock_docker_volume):
self.assertRaises(exceptions.NotMatchedState,
self.cinderprovider.delete,
'fake-vol')
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(utils, 'execute')
@mock.patch.object(FakeCinderConnector,
'get_device_path',
mock_device_path_for_delete)
@mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes.delete',
side_effect=cinder_exception.ClientException(500))
def test_delete_failed(self, mock_execute, mock_delete):
fd, tmpfname = tempfile.mkstemp()
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
self.cinderprovider._get_docker_volume = mock.MagicMock()
self.cinderprovider._get_docker_volume.return_value = (
fake_object.FakeCinderVolume(id=tmpfname,
attachments=attachments),
consts.ATTACH_TO_THIS)
self.assertRaises(cinder_exception.ClientException,
self.cinderprovider.delete,
'fake-vol')
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(utils, 'execute')
@mock.patch.object(FakeCinderConnector,
'get_device_path',
mock_device_path_for_delete)
def test_delete_timeout(self, mock_execute):
consts.DESTROY_VOLUME_TIMEOUT = 4
fd, tmpfname = tempfile.mkstemp()
attachments = [{u'server_id': u'123',
u'attachment_id': u'123',
u'attached_at': u'2016-05-20T09:19:57.000000',
u'host_name': utils.get_hostname(),
u'device': None,
u'id': u'123'}]
self.cinderprovider._get_docker_volume = mock.MagicMock()
self.cinderprovider._get_docker_volume.return_value = (
fake_object.FakeCinderVolume(id=tmpfname,
attachments=attachments),
consts.ATTACH_TO_THIS)
self.assertRaises(exceptions.TimeoutException,
self.cinderprovider.delete,
'fake-vol')
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
def test_list(self):
docker_volumes = self.cinderprovider.list()
self.assertEqual(docker_volumes, [])
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch('fuxi.tests.fake_client.FakeCinderClient.Volumes.list',
side_effect=cinder_exception.ClientException(500))
def test_list_failed(self, mock_list):
self.assertRaises(cinder_exception.ClientException,
self.cinderprovider.list)
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(utils, 'execute')
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(),
consts.ATTACH_TO_THIS))
def test_show_state_attach_to_this(self, mock_execute, mock_docker_volume):
self.assertEqual({'Name': 'fake-vol', 'Mountpoint': ''},
self.cinderprovider.show('fake-vol'))
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(
status='unknown'), consts.UNKNOWN))
def test_show_state_unknown(self, mock_docker_volume):
self.assertRaises(exceptions.NotFound,
self.cinderprovider.show,
'fake-vol')
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(status=None),
None))
def test_show_state_not_match(self, mock_docker_volume):
self.assertRaises(exceptions.FuxiException,
self.cinderprovider.show,
'fake-vol')
@mock.patch.object(cinder.Cinder, '_get_connector', mock_connector)
@mock.patch.object(cinder.Cinder, '_get_docker_volume',
return_value=(fake_object.FakeCinderVolume(
name='fake-vol',
status='in-use'), consts.ATTACH_TO_THIS))
@mock.patch.object(cinder.Cinder, '_create_mountpoint')
@mock.patch.object(mount, 'do_mount')
def test_mount(self, mock_docker_volume, mock_create_mp, mock_do_mount):
fd, fake_devpath = tempfile.mkstemp()
fake_link_path = fake_devpath
fake_mountpoint = 'fake-mount-point/'
with mock.patch.object(FakeCinderConnector, 'get_device_path',
return_value=fake_link_path):
with mock.patch.object(cinder.Cinder, '_get_mountpoint',
return_value=fake_mountpoint):
self.assertEqual(fake_mountpoint,
self.cinderprovider.mount('fake-vol'))
def test_unmount(self):
self.assertIsNone(self.cinderprovider.unmount('fake-vol'))
def test_check_exists(self):
self.cinderprovider._get_docker_volume = mock.MagicMock()
self.cinderprovider._get_docker_volume.return_value = (
None,
consts.UNKNOWN)
result = self.cinderprovider.check_exist('fake-vol')
self.assertFalse(result)
self.cinderprovider._get_docker_volume.return_value = (
fake_object.FakeCinderVolume(),
consts.NOT_ATTACH)
result = self.cinderprovider.check_exist('fake-vol')
self.assertTrue(result)

187
fuxi/utils.py Normal file
View File

@ -0,0 +1,187 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import flask
import os
import requests
import socket
import traceback
from fuxi.common import constants
from fuxi import exceptions
from fuxi.i18n import _LW, _LE
from cinderclient import client as cinder_client
from cinderclient import exceptions as cinder_exception
from keystoneauth1.session import Session
from keystoneclient.auth import get_plugin_class
from novaclient import client as nova_client
from novaclient import exceptions as nova_exception
from os_brick import exception as brick_exception
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import uuidutils
from werkzeug import exceptions as w_exceptions
cloud_init_conf = '/var/lib/cloud/instances'
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def get_hostname():
return socket.gethostname()
def get_instance_uuid():
try:
inst_uuid = ''
inst_uuid_count = 0
dirs = os.listdir(cloud_init_conf)
for uuid_dir in dirs:
if uuidutils.is_uuid_like(uuid_dir):
inst_uuid = uuid_dir
inst_uuid_count += 1
# If not or not only get on instance_uuid, then search
# it from metadata server.
if inst_uuid_count == 1:
return inst_uuid
except Exception:
LOG.warning(_LW("Get instance_uuid from cloud-init failed"))
try:
resp = requests.get('http://169.254.169.254/openstack',
timeout=constants.CURL_MD_TIMEOUT)
metadata_api_versions = resp.text.split()
metadata_api_versions.sort(reverse=True)
except Exception as e:
LOG.error(_LE("Get metadata apis failed. Error: {}").format(e))
raise exceptions.FuxiException("Metadata API Not Found")
for api_version in metadata_api_versions:
metadata_url = ''.join(['http://169.254.169.254/openstack/',
api_version,
'/meta_data.json'])
try:
resp = requests.get(metadata_url,
timeout=constants.CURL_MD_TIMEOUT)
metadata = resp.json()
if metadata.get('uuid', None):
return metadata['uuid']
except Exception as e:
LOG.warning(_LW("Get instance_uuid from metadata server {0} "
"failed. Error: {1}").format(metadata_url, e))
continue
raise exceptions.FuxiException("Instance UUID Not Found")
# Return all errors as JSON. From http://flask.pocoo.org/snippets/83/
def make_json_app(import_name, **kwargs):
app = flask.Flask(import_name, **kwargs)
@app.errorhandler(exceptions.FuxiException)
@app.errorhandler(cinder_exception.ClientException)
@app.errorhandler(nova_exception.ClientException)
@app.errorhandler(processutils.ProcessExecutionError)
@app.errorhandler(brick_exception.BrickException)
def make_json_error(ex):
app.logger.error(_LE("Unexpected error happened: %s"),
traceback.format_exc())
response = flask.jsonify({"Err": str(ex)})
response.status_code = w_exceptions.InternalServerError.code
if isinstance(ex, w_exceptions.HTTPException):
response.status_code = ex.code
content_type = 'application/vnd.docker.plugins.v1+json; charset=utf-8'
response.headers['Content-Type'] = content_type
return response
for code in w_exceptions.default_exceptions:
app.register_error_handler(code, make_json_error)
return app
def driver_dict_from_config(named_driver_config, *args, **kwargs):
driver_registry = dict()
for driver_str in named_driver_config:
driver_type, _sep, driver = driver_str.partition('=')
driver_class = importutils.import_class(driver)
driver_registry[driver_type] = driver_class(*args, **kwargs)
return driver_registry
def _openstack_auth_from_config(**config):
if config.get('username') and config.get('password'):
plugin_class = get_plugin_class('password')
else:
plugin_class = get_plugin_class('token')
plugin_options = plugin_class.get_options()
plugin_kwargs = {}
for option in plugin_options:
if option.dest in config:
plugin_kwargs[option.dest] = config[option.dest]
return plugin_class(**plugin_kwargs)
def get_keystone_session(**kwargs):
keystone_conf = CONF.keystone
config = {}
config['auth_url'] = keystone_conf.auth_url
config['username'] = keystone_conf.admin_user
config['password'] = keystone_conf.admin_password
config['tenant_name'] = keystone_conf.admin_tenant_name
config['token'] = keystone_conf.admin_token
config.update(kwargs)
if keystone_conf.auth_insecure:
verify = False
else:
verify = keystone_conf.auth_ca_cert
return Session(auth=_openstack_auth_from_config(**config), verify=verify)
def get_cinderclient(session=None, region=None, **kwargs):
if not session:
session = get_keystone_session(**kwargs)
if not region:
region = CONF.keystone['region']
return cinder_client.Client(session=session,
region_name=region,
version=2)
def get_novaclient(session=None, region=None, **kwargs):
if not session:
session = get_keystone_session(**kwargs)
if not region:
region = CONF.keystone['region']
return nova_client.Client(session=session,
region_name=region,
version=2)
def get_root_helper():
return 'sudo fuxi-rootwrap %s' % CONF.rootwrap_config
def execute(*cmd, **kwargs):
if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
kwargs['root_helper'] = get_root_helper()
return processutils.execute(*cmd, **kwargs)

17
fuxi/version.py Normal file
View File

@ -0,0 +1,17 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pbr.version
version_info = pbr.version.VersionInfo('fuxi')

View File

View File

@ -0,0 +1,433 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from cinderclient import exceptions as cinder_exception
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import strutils
from fuxi.common import constants as consts
from fuxi.common import mount
from fuxi.common import state_monitor
from fuxi import exceptions
from fuxi.i18n import _, _LE, _LI, _LW
from fuxi import utils
from fuxi.volumeprovider import provider
CONF = cfg.CONF
cinder_conf = CONF.cinder
# Volume states
UNKNOWN = consts.UNKNOWN
NOT_ATTACH = consts.NOT_ATTACH
ATTACH_TO_THIS = consts.ATTACH_TO_THIS
ATTACH_TO_OTHER = consts.ATTACH_TO_OTHER
OPENSTACK = 'openstack'
OSBRICK = 'osbrick'
volume_connector_conf = {
OPENSTACK: 'fuxi.connector.cloudconnector.openstack.CinderConnector',
OSBRICK: 'fuxi.connector.osbrickconnector.CinderConnector'}
LOG = logging.getLogger(__name__)
def get_cinder_volume_kwargs(docker_volume_name, docker_volume_opt):
"""Retrieve parameters for creating Cinder volume.
Retrieve required parameters and remove unsupported arguments from
client input. These parameters are used to create a Cinder volume.
:param docker_volume_name: Name for Cinder volume
:type docker_volume_name: str
:param docker_volume_opt: Optional parameters for Cinder volume
:type docker_volume_opt: dict
:rtype: dict
"""
options = ['size', 'consistencygroup_id', 'snapshot_id', 'source_volid',
'description', 'volume_type', 'user_id', 'project_id',
'availability_zone', 'scheduler_hints', 'source_replica',
'multiattach']
kwargs = {}
if 'size' in docker_volume_opt:
try:
size = int(docker_volume_opt.pop('size'))
except ValueError:
msg = _LE("Volume size must be able to convert to int type")
LOG.error(msg)
raise exceptions.InvalidInput(msg)
else:
size = CONF.default_volume_size
msg = _LI("Volume size doesn't provide from command, so use "
"default size {0}G").format(size)
LOG.info(msg)
kwargs['size'] = size
for key, value in docker_volume_opt.items():
if key in options:
kwargs[key] = value
if not kwargs.get('availability_zone', None):
kwargs['availability_zone'] = cinder_conf.availability_zone
if not kwargs.get('volume_type', None):
kwargs['volume_type'] = cinder_conf.volume_type
kwargs['name'] = docker_volume_name
kwargs['metadata'] = {consts.VOLUME_FROM: CONF.volume_from,
'fstype': kwargs.pop('fstype', cinder_conf.fstype)}
req_multiattach = kwargs.pop('multiattach', cinder_conf.multiattach)
kwargs['multiattach'] = strutils.bool_from_string(req_multiattach,
strict=True)
return kwargs
def get_host_id():
"""Get a value that could represent this server."""
host_id = None
volume_connector = cinder_conf.volume_connector
if volume_connector == OPENSTACK:
host_id = utils.get_instance_uuid()
elif volume_connector == OSBRICK:
host_id = utils.get_hostname().lower()
return host_id
class Cinder(provider.Provider):
volume_provider_type = 'cinder'
def __init__(self):
super(Cinder, self).__init__()
self.cinderclient = utils.get_cinderclient()
def _get_connector(self):
connector = cinder_conf.volume_connector
if not connector or connector not in volume_connector_conf:
msg = _LE("Must provide an valid volume connector")
LOG.error(msg)
raise exceptions.FuxiException(msg)
return importutils.import_class(volume_connector_conf[connector])()
def _get_docker_volume(self, docker_volume_name):
LOG.info(_LI("Retrieve docker volume {0} from "
"Cinder").format(docker_volume_name))
try:
host_id = get_host_id()
volume_connector = cinder_conf.volume_connector
search_opts = {'name': docker_volume_name,
'metadata': {consts.VOLUME_FROM: CONF.volume_from}}
for vol in self.cinderclient.volumes.list(search_opts=search_opts):
if vol.name == docker_volume_name:
if vol.attachments:
for am in vol.attachments:
if volume_connector == OPENSTACK:
if am['server_id'] == host_id:
return vol, ATTACH_TO_THIS
elif volume_connector == OSBRICK:
if (am['host_name'] or '').lower() == host_id:
return vol, ATTACH_TO_THIS
return vol, ATTACH_TO_OTHER
else:
return vol, NOT_ATTACH
return None, UNKNOWN
except cinder_exception.ClientException as ex:
LOG.error(_LE("Error happened while getting volume list "
"information from cinder. Error: {0}").format(ex))
raise
def _check_attached_to_this(self, cinder_volume):
host_id = get_host_id()
vol_conn = cinder_conf.volume_connector
for am in cinder_volume.attachments:
if vol_conn == OPENSTACK and am['server_id'] == host_id:
return True
elif vol_conn == OSBRICK and am['host_name'] \
and am['host_name'].lower() == host_id:
return True
return False
def _create_volume(self, docker_volume_name, volume_opts):
LOG.info(_LI("Start to create docker volume {0} from "
"Cinder").format(docker_volume_name))
cinder_volume_kwargs = get_cinder_volume_kwargs(docker_volume_name,
volume_opts)
try:
volume = self.cinderclient.volumes.create(**cinder_volume_kwargs)
except cinder_exception.ClientException as e:
msg = _LE("Error happened when create an volume {0} from Cinder. "
"Error: {1}").format(docker_volume_name, e)
LOG.error(msg)
raise
LOG.info(_LI("Waiting volume {0} to be available").format(volume))
volume_monitor = state_monitor.StateMonitor(
self.cinderclient,
volume,
'available',
('creating',),
time_delay=consts.VOLUME_SCAN_TIME_DELAY)
volume = volume_monitor.monitor_cinder_volume()
LOG.info(_LI("Create docker volume {0} {1} from Cinder "
"successfully").format(docker_volume_name, volume))
return volume
def create(self, docker_volume_name, volume_opts):
if not volume_opts:
volume_opts = {}
connector = self._get_connector()
cinder_volume, state = self._get_docker_volume(docker_volume_name)
LOG.info(_LI("Get docker volume {0} {1} with state "
"{2}").format(docker_volume_name, cinder_volume, state))
device_info = {}
if state == ATTACH_TO_THIS:
LOG.warn(_LW("The volume {0} {1} already exists and attached to "
"this server").format(docker_volume_name,
cinder_volume))
device_info = {'path': connector.get_device_path(cinder_volume)}
elif state == NOT_ATTACH:
LOG.warn(_LW("The volume {0} {1} is already exists but not "
"attached").format(docker_volume_name,
cinder_volume))
device_info = connector.connect_volume(cinder_volume)
elif state == ATTACH_TO_OTHER:
if cinder_volume.multiattach:
fstype = volume_opts.get('fstype', cinder_conf.fstype)
vol_fstype = cinder_volume.metadata.get('fstype',
cinder_conf.fstype)
if fstype != vol_fstype:
msg = _LE("Volume already exists with fstype: {0}, but "
"currently provided fstype is {1}, not "
"match").format(vol_fstype, fstype)
LOG.error(msg)
raise exceptions.FuxiException('FSType Not Match')
device_info = connector.connect_volume(cinder_volume)
else:
msg = _LE("The volume {0} {1} is already attached to another "
"server").format(docker_volume_name, cinder_volume)
LOG.error(msg)
raise exceptions.FuxiException(msg)
elif state == UNKNOWN:
volume_opts['name'] = docker_volume_name
cinder_volume = self._create_volume(docker_volume_name,
volume_opts)
device_info = connector.connect_volume(cinder_volume)
return device_info
def _delete_volume(self, volume):
try:
self.cinderclient.volumes.delete(volume)
except cinder_exception.NotFound:
return
except cinder_exception.ClientException as e:
msg = _LE("Error happened when delete volume from Cinder. "
"Error: {0}").format(e)
LOG.error(msg)
raise
start_time = time.time()
# Wait until the volume is not there or until the operation timeout
while(time.time() - start_time < consts.DESTROY_VOLUME_TIMEOUT):
try:
self.cinderclient.volumes.get(volume.id)
except cinder_exception.NotFound:
return
time.sleep(consts.VOLUME_SCAN_TIME_DELAY)
# If the volume is not deleted, raise an exception
msg_ft = _LE("Timed out while waiting for volume. "
"Expected Volume: {0}, "
"Expected State: {1}, "
"Elapsed Time: {2}").format(volume,
None,
time.time() - start_time)
raise exceptions.TimeoutException(msg_ft)
def delete(self, docker_volume_name):
cinder_volume, state = self._get_docker_volume(docker_volume_name)
LOG.info(_LI("Get docker volume {0} {1} with state "
"{2}").format(docker_volume_name, cinder_volume, state))
if state == ATTACH_TO_THIS:
link_path = self._get_connector().get_device_path(cinder_volume)
if not link_path or not os.path.exists(link_path):
msg = _LE(
"Could not find device link path for volume {0} {1} "
"in host").format(docker_volume_name, cinder_volume)
LOG.error(msg)
raise exceptions.FuxiException(msg)
devpath = os.path.realpath(link_path)
if not os.path.exists(devpath):
msg = _LE("Could not find device path for volume {0} {1} in "
"host").format(docker_volume_name, cinder_volume)
LOG.error(msg)
raise exceptions.FuxiException(msg)
mounter = mount.Mounter()
mps = mounter.get_mps_by_device(devpath)
ref_count = len(mps)
if ref_count > 0:
mountpoint = self._get_mountpoint(docker_volume_name)
if mountpoint in mps:
mounter.unmount(mountpoint)
self._clear_mountpoint(mountpoint)
# If this volume is still mounted on other mount point,
# then return.
if ref_count > 1:
return True
else:
return True
# Detach device from this server.
self._get_connector().disconnect_volume(cinder_volume)
available_volume = self.cinderclient.volumes.get(cinder_volume.id)
# If this volume is not used by other server any more,
# than delete it from Cinder.
if not available_volume.attachments:
msg = _LW("No other servers still use this volume {0} "
"{1} any more, so delete it from Cinder"
"").format(docker_volume_name, cinder_volume)
LOG.warn(msg)
self._delete_volume(available_volume)
return True
elif state == UNKNOWN:
return False
else:
msg = _LE("The volume {0} {1} state must be {2} when "
"remove it from this server, but current state "
"is {3}").format(docker_volume_name,
cinder_volume,
ATTACH_TO_THIS,
state)
LOG.error(msg)
raise exceptions.NotMatchedState(msg)
def list(self):
LOG.info(_LI("Start to retrieve all docker volumes from Cinder"))
docker_volumes = []
try:
search_opts = {'metadata': {consts.VOLUME_FROM: 'fuxi'}}
for vol in self.cinderclient.volumes.list(search_opts=search_opts):
docker_volume_name = vol.name
if not docker_volume_name or not vol.attachments:
continue
mountpoint = self._get_mountpoint(vol.name)
if self._check_attached_to_this(vol):
devpath = os.path.realpath(
self._get_connector().get_device_path(vol))
mps = mount.Mounter().get_mps_by_device(devpath)
mountpoint = mountpoint if mountpoint in mps else ''
docker_vol = {'Name': docker_volume_name,
'Mountpoint': mountpoint}
docker_volumes.append(docker_vol)
except cinder_exception.ClientException as e:
LOG.error(_LE("Retrieve volume list failed. Error: {0}").format(e))
raise
LOG.info(_LI("Retrieve docker volumes {0} from Cinder "
"successfully").format(docker_volumes))
return docker_volumes
def show(self, docker_volume_name):
cinder_volume, state = self._get_docker_volume(docker_volume_name)
LOG.info(_LI("Get docker volume {0} {1} with state "
"{2}").format(docker_volume_name, cinder_volume, state))
if state == ATTACH_TO_THIS:
devpath = os.path.realpath(
self._get_connector().get_device_path(cinder_volume))
mp = self._get_mountpoint(docker_volume_name)
LOG.info("Expected devpath: {0} and mountpoint: {1} for volume: "
"{2} {3}".format(devpath, mp, docker_volume_name,
cinder_volume))
mounter = mount.Mounter()
return {"Name": docker_volume_name,
"Mountpoint": mp if mp in mounter.get_mps_by_device(
devpath) else ''}
elif state == UNKNOWN:
msg = _LW("Can't find this volume '{0}' in "
"Cinder").format(docker_volume_name)
LOG.warn(msg)
raise exceptions.NotFound(msg)
else:
msg = _LE("Volume '{0}' exists, but not attached to this volume,"
"and current state is {1}").format(docker_volume_name,
state)
raise exceptions.NotMatchedState(msg)
def mount(self, docker_volume_name):
cinder_volume, state = self._get_docker_volume(docker_volume_name)
LOG.info(_LI("Get docker volume {0} {1} with state "
"{2}").format(docker_volume_name, cinder_volume, state))
if state != ATTACH_TO_THIS:
msg = _("Volume {0} is not in correct state, current state "
"is {1}").format(docker_volume_name, state)
LOG.error(msg)
raise exceptions.FuxiException(msg)
connector = self._get_connector()
link_path = connector.get_device_path(cinder_volume)
if not os.path.exists(link_path):
LOG.warn(_LW("Could not find device link file, "
"so rebuild it"))
connector.disconnect_volume(cinder_volume)
connector.connect_volume(cinder_volume)
devpath = os.path.realpath(link_path)
if not devpath or not os.path.exists(devpath):
msg = _("Can't find volume device path")
LOG.error(msg)
raise exceptions.FuxiException(msg)
mountpoint = self._get_mountpoint(docker_volume_name)
self._create_mountpoint(mountpoint)
fstype = cinder_volume.metadata.get('fstype', cinder_conf.fstype)
mount.do_mount(devpath, mountpoint, fstype)
return mountpoint
def unmount(self, docker_volume_name):
return
def check_exist(self, docker_volume_name):
_, state = self._get_docker_volume(docker_volume_name)
LOG.info(_LI("Get docker volume {0} with state "
"{1}").format(docker_volume_name, state))
if state == UNKNOWN:
return False
return True

View File

@ -0,0 +1,114 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
import six
from fuxi import exceptions
from fuxi.i18n import _LI, _LE
from fuxi import utils
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Provider(object):
"""Base class for each volume provider.
Provider provider some operation related with Docker volume provider by
each backend volume provider, like Cinder.
"""
volume_provider_type = None
def __init__(self):
pass
@abc.abstractmethod
def create(self, docker_volume_name, volume_opts):
pass
@abc.abstractmethod
def delete(self, docker_volume_name):
pass
@abc.abstractmethod
def list(self):
pass
@abc.abstractmethod
def show(self, docker_volume_name):
pass
@abc.abstractmethod
def mount(self, docker_volume_name):
pass
@abc.abstractmethod
def unmount(self, docker_volume_name):
pass
@abc.abstractmethod
def check_exist(self, docker_volume_name):
pass
def _get_mountpoint(self, docker_volume_name):
"""Generate a mount point for volume.
:param docker_volume_name:
:rtype: str
"""
if not docker_volume_name:
LOG.error(_LE("Volume name could not be None"))
raise exceptions.FuxiException("Volume name could not be None")
if self.volume_provider_type:
return os.path.join(CONF.volume_dir,
self.volume_provider_type,
docker_volume_name)
else:
return os.path.join(CONF.volume_dir,
docker_volume_name)
def _create_mountpoint(self, mountpoint):
"""Create mount point directory for Docker volume.
:param mountpoint: The path of Docker volume.
"""
try:
if not os.path.exists(mountpoint) or not os.path.isdir(mountpoint):
utils.execute('mkdir', '-p', '-m=755', mountpoint,
run_as_root=True)
LOG.info(_LI("Create mountpoint %s successfully"), mountpoint)
except processutils.ProcessExecutionError as e:
LOG.error(_LE("Error happened when create volume "
"directory. Error: %s"), e)
raise
def _clear_mountpoint(self, mountpoint):
"""Clear mount point directory if it wouldn't used any more.
:param mountpoint: The path of Docker volume.
"""
if os.path.exists(mountpoint) and os.path.isdir(mountpoint):
try:
utils.execute('rm', '-r', mountpoint, run_as_root=True)
LOG.info(_LI("Clear mountpoint %s successfully"), mountpoint)
except processutils.ProcessExecutionError as e:
LOG.error(_LE("Error happened when clear mountpoint {0}"), e)
raise

View File

@ -2,4 +2,20 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr>=1.6 # Apache-2.0
pbr>=1.6 # Apache-2.0
pytz>=2013.6 # MIT
Babel>=2.3.4 # BSD
Flask>=0.10,!=0.11,<1.0 # BSD
keystoneauth1>=2.7.0 # Apache-2.0
oslo.rootwrap>=2.0.0 # Apache-2.0
oslo.concurrency>=3.8.0 # Apache-2.0
oslo.config>=3.12.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
oslo.log>=1.14.0 # Apache-2.0
oslo.utils>=3.15.0 # Apache-2.0
os-brick>=1.3.0,!=1.4.0 # Apache-2.0
python-cinderclient>=1.6.0,!=1.7.0,!=1.7.1 # Apache-2.0
python-novaclient>=2.29.0,!=2.33.0 # Apache-2.0
python-keystoneclient>=1.7.0,!=1.8.0,!=2.1.0 # Apache-2.0
requests>=2.10.0 # Apache-2.0
six>=1.9.0 # MIT

View File

@ -19,9 +19,25 @@ classifier =
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
[entry_points]
oslo.config.opts =
fuxi = fuxi.opts:list_fuxi_opts
console_scripts =
fuxi-server = fuxi.server:start
fuxi-rootwrap = oslo_rootwrap.cmd:main
[files]
packages =
fuxi
data_files =
/etc/fuxi =
etc/fuxi.conf
etc/rootwrap.conf
/etc/fuxi/rootwrap.d =
etc/rootwrap.d/fuxi.filters
/usr/lib/docker/plugins/fuxi =
etc/fuxi.json
[build_sphinx]
source-dir = doc/source

36
tox.ini
View File

@ -1,60 +1,40 @@
[tox]
minversion = 2.0
envlist = py34-constraints,py27-constraints,pypy-constraints,pep8-constraints
envlist = py34,py27,pep8
skipsdist = True
[testenv]
usedevelop = True
install_command =
constraints: {[testenv:common-constraints]install_command}
pip install -U {opts} {packages}
install_command = pip install -U {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/test-requirements.txt
commands = python setup.py testr --slowest --testr-args='{posargs}'
[testenv:common-constraints]
install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
[testenv:pep8]
commands = flake8 {posargs}
[testenv:pep8-constraints]
install_command = {[testenv:common-constraints]install_command}
commands = flake8 {posargs}
[testenv:venv]
commands = {posargs}
[testenv:venv-constraints]
install_command = {[testenv:common-constraints]install_command}
commands = {posargs}
[testenv:cover]
commands = python setup.py test --coverage --testr-args='{posargs}'
[testenv:cover-constraints]
install_command = {[testenv:common-constraints]install_command}
commands = python setup.py test --coverage --testr-args='{posargs}'
[testenv:docs]
commands = python setup.py build_sphinx
[testenv:docs-constraints]
install_command = {[testenv:common-constraints]install_command}
commands = python setup.py build_sphinx
[testenv:debug]
commands = oslo_debug_helper {posargs}
[testenv:debug-constraints]
install_command = {[testenv:common-constraints]install_command}
commands = oslo_debug_helper {posargs}
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
show-source = True
ignore = E123,E125
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
[hacking]
import_exceptions = fuxi.i18n, fuxi.tests
[testenv:genconfig]
commands = oslo-config-generator --config-file=etc/fuxi-config-generator.conf