Browse Source

Merge "Splits service_instance module from generic driver"

changes/39/79739/12
Jenkins 8 years ago
committed by Gerrit Code Review
parent
commit
6849850945
  1. 4
      manila/exception.py
  2. 19
      manila/network/neutron/api.py
  3. 570
      manila/share/drivers/generic.py
  4. 578
      manila/share/drivers/service_instance.py
  5. 1
      manila/tests/conf_fixture.py
  6. 657
      manila/tests/test_service_instance.py
  7. 619
      manila/tests/test_share_generic.py

4
manila/exception.py

@ -537,3 +537,7 @@ class InstanceNotFound(NotFound):
class BridgeDoesNotExist(ManilaException):
message = _("Bridge %(bridge)s does not exist.")
class ServiceInstanceException(ManilaException):
message = _("Exception in service instance manager occurred.")

19
manila/network/neutron/api.py

@ -144,6 +144,13 @@ class API(base.Base):
raise exception.NetworkException(code=e.status_code,
message=e.message)
def delete_subnet(self, subnet_id):
try:
self.client.delete_subnet(subnet_id)
except neutron_client_exc.NeutronClientException as e:
raise exception.NetworkException(code=e.status_code,
message=e.message)
def list_ports(self, **search_opts):
"""List ports for the client based on search options."""
return self.client.list_ports(**search_opts).get('ports')
@ -233,6 +240,18 @@ class API(base.Base):
raise exception.NetworkException(code=e.status_code,
message=e.message)
def router_remove_interface(self, router_id, subnet_id, port_id=None):
body = {}
if subnet_id:
body['subnet_id'] = subnet_id
if port_id:
body['port_id'] = port_id
try:
self.client.remove_interface_router(router_id, body)
except neutron_client_exc.NeutronClientException as e:
raise exception.NetworkException(code=e.status_code,
message=e.message)
def router_list(self):
try:
return self.client.list_routers().get('routers', {})

570
manila/share/drivers/generic.py

@ -1,4 +1,4 @@
# Copyright 2014 Mirantis Inc.
# Copyright (c) 2014 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -12,96 +12,55 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Generic Driver for shares.
"""
"""Generic Driver for shares."""
import ConfigParser
import netaddr
import os
import re
import shutil
import socket
import threading
import time
from oslo.config import cfg
from manila import compute
from manila import context
from manila import exception
from manila.network.linux import ip_lib
from manila.network.neutron import api as neutron
from manila.openstack.common import importutils
from manila.openstack.common import log as logging
from manila.share import driver
from manila import utils
from manila.share.drivers import service_instance
from manila import volume
from oslo.config import cfg
LOG = logging.getLogger(__name__)
share_opts = [
cfg.StrOpt('service_image_name',
default='manila-service-image',
help="Name of image in glance, that will be used to create "
"service instance"),
cfg.StrOpt('smb_template_config_path',
default='$state_path/smb.conf',
help="Path to smb config"),
help="Path to smb config."),
cfg.StrOpt('service_instance_name_template',
default='manila_service_instance-%s',
help="Name of service instance"),
cfg.StrOpt('service_instance_user',
help="User in service instance"),
cfg.StrOpt('service_instance_password',
default=None,
help="Password to service instance user"),
help="Name of service instance."),
cfg.StrOpt('volume_name_template',
default='manila-share-%s',
help="Volume name template"),
cfg.StrOpt('manila_service_keypair_name',
default='manila-service',
help="Name of keypair that will be created and used "
"for service instance"),
cfg.StrOpt('path_to_public_key',
default='/home/stack/.ssh/id_rsa.pub',
help="Path to hosts public key"),
cfg.StrOpt('path_to_private_key',
default='/home/stack/.ssh/id_rsa',
help="Path to hosts private key"),
help="Volume name template."),
cfg.StrOpt('volume_snapshot_name_template',
default='manila-snapshot-%s',
help="Volume snapshot name template"),
cfg.IntOpt('max_time_to_build_instance',
default=300,
help="Maximum time to wait for creating service instance"),
help="Volume snapshot name template."),
cfg.StrOpt('share_mount_path',
default='/shares',
help="Parent path in service instance where shares "
"will be mounted"),
"will be mounted."),
cfg.IntOpt('max_time_to_create_volume',
default=180,
help="Maximum time to wait for creating cinder volume"),
help="Maximum time to wait for creating cinder volume."),
cfg.IntOpt('max_time_to_attach',
default=120,
help="Maximum time to wait for attaching cinder volume"),
cfg.IntOpt('service_instance_flavor_id',
default=100,
help="ID of flavor, that will be used for service instance "
"creation"),
help="Maximum time to wait for attaching cinder volume."),
cfg.StrOpt('service_instance_smb_config_path',
default='$share_mount_path/smb.conf',
help="Path to smb config in service instance"),
cfg.StrOpt('service_network_name',
default='manila_service_network',
help="Name of manila service network"),
cfg.StrOpt('service_network_cidr',
default='10.254.0.0/16',
help="CIDR of manila service network"),
cfg.StrOpt('interface_driver',
default='manila.network.linux.interface.OVSInterfaceDriver',
help="Vif driver"),
help="Path to smb config in service instance."),
cfg.ListOpt('share_helpers',
default=[
'CIFS=manila.share.drivers.generic.CIFSHelper',
@ -113,31 +72,8 @@ share_opts = [
CONF = cfg.CONF
CONF.register_opts(share_opts)
def synchronized(f):
"""Decorates function with unique locks for each share network."""
def wrapped_func(self, *args, **kwargs):
for arg in args:
share_network_id = getattr(arg, 'share_network_id', None)
if isinstance(arg, dict):
share_network_id = arg.get('share_network_id', None)
if share_network_id:
break
else:
raise exception.\
ManilaException(_('Could not get share network id'))
with self.share_networks_locks.setdefault(share_network_id,
threading.RLock()):
return f(self, *args, **kwargs)
return wrapped_func
def _ssh_exec(server, command):
"""Executes ssh commands and checks/restores ssh connection."""
if not server['ssh'].get_transport().is_active():
server['ssh_pool'].remove(server['ssh'])
server['ssh'] = server['ssh_pool'].create()
return utils.ssh_execute(server['ssh'], ' '.join(command))
_ssh_exec = service_instance._ssh_exec
synchronized = service_instance.synchronized
class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
@ -149,59 +85,31 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
self.admin_context = context.get_admin_context()
self.db = db
self.configuration.append_config_values(share_opts)
self._helpers = None
self._helpers = {}
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met."""
if not self.configuration.service_instance_user:
raise exception.ManilaException(_('Service instance user is not '
'specified'))
pass
def do_setup(self, context):
"""Any initialization the generic driver does while starting."""
super(GenericShareDriver, self).do_setup(context)
self.compute_api = compute.API()
self.volume_api = volume.API()
self.neutron_api = neutron.API()
self.share_networks_locks = {}
self.share_networks_servers = {}
attempts = 5
while attempts:
try:
self.service_tenant_id = self.neutron_api.admin_tenant_id
break
except exception.NetworkException:
LOG.debug(_('Connection to neutron failed'))
attempts -= 1
time.sleep(3)
else:
raise exception.\
ManilaException(_('Can not receive service tenant id'))
self.service_network_id = self._get_service_network()
self.vif_driver = importutils.\
import_class(self.configuration.interface_driver)()
self._setup_connectivity_with_service_instances()
self.service_instance_manager = service_instance.\
ServiceInstanceManager(self.db, self._helpers)
self.get_service_instance = self.service_instance_manager.\
get_service_instance
self.share_networks_locks = self.service_instance_manager.\
share_networks_locks
self.share_networks_servers = self.service_instance_manager.\
share_networks_servers
self._setup_helpers()
def _get_service_network(self):
"""Finds existing or creates new service network."""
service_network_name = self.configuration.service_network_name
networks = [network for network in self.neutron_api.
get_all_tenant_networks(self.service_tenant_id)
if network['name'] == service_network_name]
if len(networks) > 1:
raise exception.ManilaException(_('Ambiguous service networks'))
elif not networks:
return self.neutron_api.network_create(self.service_tenant_id,
service_network_name)['id']
else:
return networks[0]['id']
def _setup_helpers(self):
"""Initializes protocol-specific NAS drivers."""
self._helpers = {}
for helper_str in self.configuration.share_helpers:
share_proto, _, import_str = helper_str.partition('=')
share_proto, __, import_str = helper_str.partition('=')
helper = importutils.import_class(import_str)
self._helpers[share_proto.upper()] = helper(self._execute,
self.configuration,
@ -210,9 +118,10 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
def create_share(self, context, share):
"""Creates share."""
if share['share_network_id'] is None:
raise exception.\
ManilaException(_('Share Network is not specified'))
server = self._get_service_instance(self.admin_context, share)
raise exception.ManilaException(
_('Share Network is not specified'))
server = self.get_service_instance(self.admin_context,
share_network_id=share['share_network_id'])
volume = self._allocate_container(context, share)
volume = self._attach_volume(context, share, server, volume)
self._format_device(server, volume)
@ -314,8 +223,8 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
if len(volume_snapshot_list) == 1:
volume_snapshot = volume_snapshot_list[0]
elif len(volume_snapshot_list) > 1:
raise exception.\
ManilaException(_('Error. Ambiguous volume snaphots'))
raise exception.ManilaException(
_('Error. Ambiguous volume snaphots'))
return volume_snapshot
@synchronized
@ -341,382 +250,16 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
% self.configuration.max_time_to_attach)
def _get_device_path(self, context, server):
"""Returns device path, that will be used for cinder volume attaching.
"""
"""Returns device path for cinder volume attaching."""
volumes = self.compute_api.instance_volumes_list(context, server['id'])
used_literals = set(volume.device[-1] for volume in volumes
if '/dev/vd' in volume.device)
lit = 'b'
while lit in used_literals:
lit = chr(ord(lit) + 1)
device_name = '/dev/vd' + lit
device_name = '/dev/vd%s' % lit
return device_name
def _get_service_instance_name(self, share):
"""Returns service vms name."""
return self.configuration.service_instance_name_template % \
share['share_network_id']
def _get_server_ip(self, server):
"""Returns service vms ip address."""
net = server['networks']
try:
net_ips = net[self.configuration.service_network_name]
return net_ips[0]
except KeyError:
msg = _('Service vm is not attached to %s network')
except IndexError:
msg = _('Service vm has no ips on %s network')
msg = msg % self.configuration.service_network_name
LOG.error(msg)
raise exception.ManilaException(msg)
def _ensure_or_delete_server(self, context, server, update=False):
"""Ensures that server exists and active, otherwise deletes it."""
if update:
try:
server.update(self.compute_api.server_get(context,
server['id']))
except exception.InstanceNotFound as e:
LOG.debug(e)
return False
if server['status'] == 'ACTIVE':
if self._check_server_availability(server):
return True
self._delete_server(context, server)
return False
def _delete_server(self, context, server):
"""Deletes the server."""
self.compute_api.server_delete(context, server['id'])
t = time.time()
while time.time() - t < self.configuration.\
max_time_to_build_instance:
try:
server = self.compute_api.server_get(context,
server['id'])
except exception.InstanceNotFound:
LOG.debug(_('Service instance was deleted succesfully'))
break
time.sleep(1)
else:
raise exception.ManilaException(_('Instance have not been deleted '
'in %ss. Giving up') %
self.configuration.max_time_to_build_instance)
@synchronized
def _get_service_instance(self, context, share, create=True):
"""Finds or creates and setups service vm."""
server = self.share_networks_servers.get(share['share_network_id'], {})
old_server_ip = server.get('ip', None)
if server and self._ensure_or_delete_server(context,
server,
update=True):
return server
else:
server = {}
service_instance_name = self._get_service_instance_name(share)
search_opts = {'name': service_instance_name}
servers = self.compute_api.server_list(context, search_opts, True)
if len(servers) == 1:
server = servers[0]
server['ip'] = self._get_server_ip(server)
old_server_ip = server['ip']
if not self._ensure_or_delete_server(context, server):
server.clear()
elif len(servers) > 1:
raise exception.\
ManilaException(_('Ambiguous service instances'))
if not server and create:
server = self._create_service_instance(context,
service_instance_name,
share, old_server_ip)
if server:
server['share_network_id'] = share['share_network_id']
server['ip'] = self._get_server_ip(server)
server['ssh_pool'] = self._get_ssh_pool(server)
server['ssh'] = server['ssh_pool'].create()
for helper in self._helpers.values():
helper.init_helper(server)
self.share_networks_servers[share['share_network_id']] = server
return server
def _get_ssh_pool(self, server):
"""Returns ssh connection pool for service vm."""
ssh_pool = utils.SSHPool(server['ip'], 22, None,
self.configuration.service_instance_user,
password=self.configuration.service_instance_password,
privatekey=self.configuration.path_to_private_key,
max_size=1)
return ssh_pool
def _get_key(self, context):
"""Returns name of key, that will be injected to service vm."""
if not self.configuration.path_to_public_key or \
not self.configuration.path_to_private_key:
return
path_to_public_key = \
os.path.expanduser(self.configuration.path_to_public_key)
path_to_private_key = \
os.path.expanduser(self.configuration.path_to_private_key)
if not os.path.exists(path_to_public_key) or \
not os.path.exists(path_to_private_key):
return
keypair_name = self.configuration.manila_service_keypair_name
keypairs = [k for k in self.compute_api.keypair_list(context)
if k.name == keypair_name]
if len(keypairs) > 1:
raise exception.ManilaException(_('Ambiguous keypairs'))
public_key, _ = self._execute('cat',
path_to_public_key,
run_as_root=True)
if not keypairs:
keypair = self.compute_api.keypair_import(context, keypair_name,
public_key)
else:
keypair = keypairs[0]
if keypair.public_key != public_key:
LOG.debug('Public key differs from existing keypair. '
'Creating new keypair')
self.compute_api.keypair_delete(context, keypair.id)
keypair = self.compute_api.keypair_import(context,
keypair_name,
public_key)
return keypair.name
def _get_service_image(self, context):
"""Returns ID of service image, that will be used for service vm
creating.
"""
images = [image.id for image in self.compute_api.image_list(context)
if image.name == self.configuration.service_image_name]
if len(images) == 1:
return images[0]
elif not images:
raise exception.\
ManilaException(_('No appropriate image was found'))
else:
raise exception.ManilaException(_('Ambiguous image name'))
def _create_service_instance(self, context, instance_name, share,
old_server_ip):
"""Creates service vm and sets up networking for it."""
service_image_id = self._get_service_image(context)
key_name = self._get_key(context)
if not self.configuration.service_instance_password and not key_name:
raise exception.ManilaException(_('Neither service instance'
'password nor key are available'))
port = self._setup_network_for_instance(context, share, old_server_ip)
try:
self._setup_connectivity_with_service_instances()
except Exception as e:
LOG.debug(e)
self.neutron_api.delete_port(port['id'])
raise
service_instance = self.compute_api.server_create(context,
instance_name, service_image_id,
self.configuration.service_instance_flavor_id,
key_name, None, None,
nics=[{'port-id': port['id']}])
t = time.time()
while time.time() - t < self.configuration.max_time_to_build_instance:
if service_instance['status'] == 'ACTIVE':
break
if service_instance['status'] == 'ERROR':
raise exception.ManilaException(_('Failed to build service '
'instance'))
time.sleep(1)
try:
service_instance = self.compute_api.server_get(context,
service_instance['id'])
except exception.InstanceNotFound as e:
LOG.debug(e)
else:
raise exception.ManilaException(_('Instance have not been spawned '
'in %ss. Giving up') %
self.configuration.max_time_to_build_instance)
service_instance['ip'] = self._get_server_ip(service_instance)
if not self._check_server_availability(service_instance):
raise exception.ManilaException(_('SSH connection have not been '
'established in %ss. Giving up')
% self.configuration.max_time_to_build_instance)
return service_instance
def _check_server_availability(self, server):
t = time.time()
while time.time() - t < self.configuration.max_time_to_build_instance:
LOG.debug('Checking service vm availablity')
try:
socket.socket().connect((server['ip'], 22))
LOG.debug(_('Service vm is available via ssh.'))
return True
except socket.error as e:
LOG.debug(e)
LOG.debug(_('Server is not available through ssh. Waiting...'))
time.sleep(5)
return False
def _setup_network_for_instance(self, context, share, old_server_ip):
"""Setups network for service vm."""
service_network = self.neutron_api.get_network(self.service_network_id)
all_service_subnets = [self.neutron_api.get_subnet(subnet_id)
for subnet_id in service_network['subnets']]
service_subnets = [subnet for subnet in all_service_subnets
if subnet['name'] == share['share_network_id']]
if len(service_subnets) > 1:
raise exception.ManilaException(_('Ambiguous subnets'))
elif not service_subnets:
service_subnet = \
self.neutron_api.subnet_create(self.service_tenant_id,
self.service_network_id,
share['share_network_id'],
self._get_cidr_for_subnet(all_service_subnets))
else:
service_subnet = service_subnets[0]
share_network = self.db.share_network_get(context,
share['share_network_id'])
private_router = self._get_private_router(share_network)
try:
self.neutron_api.router_add_interface(private_router['id'],
service_subnet['id'])
except exception.NetworkException as e:
if 'already has' not in e.msg:
raise
LOG.debug(_('Subnet %(subnet_id)s is already attached to the '
'router %(router_id)s') %
{'subnet_id': service_subnet['id'],
'router_id': private_router['id']})
return self.neutron_api.create_port(self.service_tenant_id,
self.service_network_id,
subnet_id=service_subnet['id'],
fixed_ip=old_server_ip,
device_owner='manila')
def _get_private_router(self, share_network):
"""Returns router attached to private subnet gateway."""
private_subnet = self.neutron_api.\
get_subnet(share_network['neutron_subnet_id'])
if not private_subnet['gateway_ip']:
raise exception.ManilaException(_('Subnet must have gateway'))
private_network_ports = [p for p in self.neutron_api.list_ports(
network_id=share_network['neutron_net_id'])]
for p in private_network_ports:
fixed_ip = p['fixed_ips'][0]
if fixed_ip['subnet_id'] == private_subnet['id'] and \
fixed_ip['ip_address'] == private_subnet['gateway_ip']:
private_subnet_gateway_port = p
break
else:
raise exception.ManilaException(_('Subnet gateway is not attached '
'the router'))
private_subnet_router = self.neutron_api.show_router(
private_subnet_gateway_port['device_id'])
return private_subnet_router
def _setup_connectivity_with_service_instances(self):
"""Setups connectivity with service instances by creating port
in service network, creating and setting up required network devices.
"""
port = self._setup_service_port()
interface_name = self.vif_driver.get_device_name(port)
self.vif_driver.plug(port['id'], interface_name, port['mac_address'])
ip_cidrs = []
for fixed_ip in port['fixed_ips']:
subnet = self.neutron_api.get_subnet(fixed_ip['subnet_id'])
net = netaddr.IPNetwork(subnet['cidr'])
ip_cidr = '%s/%s' % (fixed_ip['ip_address'], net.prefixlen)
ip_cidrs.append(ip_cidr)
self.vif_driver.init_l3(interface_name, ip_cidrs)
# ensure that interface is first in the list
device = ip_lib.IPDevice(interface_name)
device.route.pullup_route(interface_name)
# here we are checking for garbage devices from removed service port
self._clean_garbage(device)
def _clean_garbage(self, device):
"""Finds and removes network device, that was associated with deleted
service port.
"""
list_dev = []
for dev in ip_lib.IPWrapper().get_devices():
if dev.name != device.name and dev.name[:3] == device.name[:3]:
cidr_set = set()
for a in dev.addr.list():
if a['ip_version'] == 4:
cidr_set.add(str(netaddr.IPNetwork(a['cidr']).cidr))
list_dev.append((dev.name, cidr_set))
device_cidr_set = set(str(netaddr.IPNetwork(a['cidr']).cidr)
for a in device.addr.list()
if a['ip_version'] == 4)
for dev_name, cidr_set in list_dev:
if device_cidr_set & cidr_set:
self.vif_driver.unplug(dev_name)
def _setup_service_port(self):
"""Find or creates neutron port, that will be used for connectivity
with service instances.
"""
ports = [port for port in self.neutron_api.
list_ports(device_id='manila-share')]
if len(ports) > 1:
raise exception.\
ManilaException(_('Error. Ambiguous service ports'))
elif not ports:
try:
stdout, stderr = self._execute('hostname')
host = stdout.strip()
except exception.ProcessExecutionError as e:
msg = _('Unable to get host. %s') % e.stderr
raise exception.ManilaException(msg)
port = self.neutron_api.create_port(self.service_tenant_id,
self.service_network_id,
device_id='manila-share',
device_owner='manila:generic_driver',
host_id=host)
else:
port = ports[0]
network = self.neutron_api.get_network(self.service_network_id)
subnets = set(network['subnets'])
port_fixed_ips = []
for fixed_ip in port['fixed_ips']:
port_fixed_ips.append({'subnet_id': fixed_ip['subnet_id'],
'ip_address': fixed_ip['ip_address']})
if fixed_ip['subnet_id'] in subnets:
subnets.remove(fixed_ip['subnet_id'])
# If there are subnets here that means that
# we need to add those to the port and call update.
if subnets:
port_fixed_ips.extend([dict(subnet_id=s) for s in subnets])
port = self.neutron_api.update_port_fixed_ips(
port['id'], {'fixed_ips': port_fixed_ips})
return port
def _get_cidr_for_subnet(self, subnets):
"""Returns not used cidr for service subnet creating."""
used_cidrs = set(subnet['cidr'] for subnet in subnets)
serv_cidr = netaddr.IPNetwork(self.configuration.service_network_cidr)
for subnet in serv_cidr.subnet(29):
cidr = str(subnet.cidr)
if cidr not in used_cidrs:
return cidr
else:
raise exception.ManilaException(_('No available cidrs'))
def _allocate_container(self, context, share, snapshot=None):
"""Creates cinder volume, associated to share by name."""
volume_snapshot = None
@ -748,8 +291,8 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
if volume:
self.volume_api.delete(context, volume['id'])
t = time.time()
while time.time() - t < self.configuration.\
max_time_to_create_volume:
while (time.time() - t <
self.configuration.max_time_to_create_volume):
try:
volume = self.volume_api.get(context, volume['id'])
except exception.VolumeNotFound:
@ -785,15 +328,16 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
data['total_capacity_gb'] = 'infinite'
data['free_capacity_gb'] = 'infinite'
data['reserved_percentage'] = \
self.configuration.reserved_share_percentage
data['reserved_percentage'] = (self.configuration.
reserved_share_percentage)
data['QoS_support'] = False
self._stats = data
def create_share_from_snapshot(self, context, share, snapshot):
"""Is called to create share from snapshot."""
server = self._get_service_instance(self.admin_context, share)
server = self.get_service_instance(self.admin_context,
share_network_id=share['share_network_id'])
volume = self._allocate_container(context, share, snapshot)
volume = self._attach_volume(context, share, server, volume)
self._mount_device(context, share, server, volume)
@ -805,8 +349,9 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
"""Deletes share."""
if not share['share_network_id']:
return
server = self._get_service_instance(self.admin_context,
share, create=False)
server = self.get_service_instance(self.admin_context,
share_network_id=share['share_network_id'],
create=False)
if server:
self._get_helper(share).remove_export(server, share['name'])
self._unmount_device(context, share, server)
@ -816,8 +361,8 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
def create_snapshot(self, context, snapshot):
"""Creates a snapshot."""
volume = self._get_volume(context, snapshot['share_id'])
volume_snapshot_name = self.configuration.\
volume_snapshot_name_template % snapshot['id']
volume_snapshot_name = (self.configuration.
volume_snapshot_name_template % snapshot['id'])
volume_snapshot = self.volume_api.create_snapshot_force(context,
volume['id'],
volume_snapshot_name,
@ -859,7 +404,8 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
def ensure_share(self, context, share):
"""Ensure that storage are mounted and exported."""
server = self._get_service_instance(context, share)
server = self.get_service_instance(context,
share_network_id=share['share_network_id'])
volume = self._get_volume(context, share['id'])
volume = self._attach_volume(context, share, server, volume)
self._mount_device(context, share, server, volume)
@ -867,9 +413,9 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
def allow_access(self, context, share, access):
"""Allow access to the share."""
server = self._get_service_instance(self.admin_context,
share,
create=False)
server = self.get_service_instance(self.admin_context,
share_network_id=share['share_network_id'],
create=False)
if not server:
raise exception.ManilaException('Server not found. Try to '
'restart manila share service')
@ -881,9 +427,9 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
"""Deny access to the share."""
if not share['share_network_id']:
return
server = self._get_service_instance(self.admin_context,
share,
create=False)
server = self.get_service_instance(self.admin_context,
share_network_id=share['share_network_id'],
create=False)
if server:
self._get_helper(share).deny_access(server, share['name'],
access['access_type'],
@ -1009,8 +555,7 @@ class CIFSHelper(NASHelperBase):
local_config = self._create_local_config(server['share_network_id'])
config_dir = os.path.dirname(self.config_path)
try:
_ssh_exec(server, ['sudo', 'mkdir',
config_dir])
_ssh_exec(server, ['sudo', 'mkdir', config_dir])
except exception.ProcessExecutionError as e:
if 'File exists' not in e.stderr:
raise
@ -1068,12 +613,12 @@ class CIFSHelper(NASHelperBase):
self._update_config(parser, config)
self._write_remote_config(config, server)
_ssh_exec(server, ['sudo', 'smbcontrol', 'all', 'close-share',
share_name])
share_name])
@synchronized
def _write_remote_config(self, config, server):
with open(config, 'r') as f:
cfg = "'" + f.read() + "'"
cfg = "'%s'" % f.read()
_ssh_exec(server, ['echo %s > %s' % (cfg, self.config_path)])
def allow_access(self, server, share_name, access_type, access):
@ -1131,8 +676,7 @@ class CIFSHelper(NASHelperBase):
#Check that configuration is correct
with open(self.test_config, 'w') as fp:
parser.write(fp)
self._execute('testparm', '-s', self.test_config,
check_exit_code=True)
self._execute('testparm', '-s', self.test_config, check_exit_code=True)
#save it
with open(config, 'w') as fp:
parser.write(fp)

578
manila/share/drivers/service_instance.py

@ -0,0 +1,578 @@
# Copyright (c) 2014 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Module for managing nova instances for share drivers."""
import netaddr
import os
import socket
import threading
import time
from oslo.config import cfg
from manila import compute
from manila import context
from manila import exception
from manila.network.linux import ip_lib
from manila.network.neutron import api as neutron
from manila.openstack.common import importutils
from manila.openstack.common import log as logging
from manila import utils
LOG = logging.getLogger(__name__)
server_opts = [
cfg.StrOpt('service_image_name',
default='manila-service-image',
help="Name of image in glance, that will be used to create "
"service instance."),
cfg.StrOpt('service_instance_name_template',
default='manila_service_instance-%s',
help="Name of service instance."),
cfg.StrOpt('service_instance_user',
help="User in service instance."),
cfg.StrOpt('service_instance_password',
default=None,
help="Password to service instance user."),
cfg.StrOpt('manila_service_keypair_name',
default='manila-service',
help="Name of keypair that will be created and used "
"for service instance."),
cfg.StrOpt('path_to_public_key',
default='~/.ssh/id_rsa.pub',
help="Path to hosts public key."),
cfg.StrOpt('path_to_private_key',
default='~/.ssh/id_rsa',
help="Path to hosts private key."),
cfg.IntOpt('max_time_to_build_instance',
default=300,
help="Maximum time to wait for creating service instance."),
cfg.IntOpt('service_instance_flavor_id',
default=100,
help="ID of flavor, that will be used for service instance "
"creation."),
cfg.StrOpt('service_network_name',
default='manila_service_network',
help="Name of manila service network."),
cfg.StrOpt('service_network_cidr',
default='10.254.0.0/16',
help="CIDR of manila service network."),
cfg.StrOpt('interface_driver',
default='manila.network.linux.interface.OVSInterfaceDriver',
help="Vif driver."),
]
CONF = cfg.CONF
CONF.register_opts(server_opts)
def synchronized(f):
"""Decorates function with unique locks for each share network.
Share network id must be provided either as value/attribute
of one of args or as named argument.
"""
def wrapped_func(self, *args, **kwargs):
share_network_id = kwargs.get('share_network_id', None)
if not share_network_id:
for arg in args:
share_network_id = getattr(arg, 'share_network_id', None)
if isinstance(arg, dict):
share_network_id = arg.get('share_network_id', None)
if share_network_id:
break
else:
raise exception.ServiceInstanceException(_('Could not get '
'share network id'))
with self.share_networks_locks.setdefault(share_network_id,
threading.RLock()):
return f(self, *args, **kwargs)
return wrapped_func
def _ssh_exec(server, command):
"""Executes ssh commands and checks/restores ssh connection."""
if not server['ssh'].get_transport().is_active():
server['ssh_pool'].remove(server['ssh'])
server['ssh'] = server['ssh_pool'].create()
return utils.ssh_execute(server['ssh'], ' '.join(command))
class ServiceInstanceManager(object):
"""Manages nova instances for various share drivers."""
def __init__(self, db, _helpers, *args, **kwargs):
"""Do initialization."""
super(ServiceInstanceManager, self).__init__(*args, **kwargs)
if not CONF.service_instance_user:
raise exception.ServiceInstanceException(_('Service instance user '
'is not specified'))
self.admin_context = context.get_admin_context()
self._execute = utils.execute
self.compute_api = compute.API()
self.neutron_api = neutron.API()
self._helpers = _helpers
self.db = db
attempts = 5
while attempts:
try:
self.service_tenant_id = self.neutron_api.admin_tenant_id
break
except exception.NetworkException:
LOG.debug(_('Connection to neutron failed.'))
attempts -= 1
time.sleep(3)
else:
raise exception.ServiceInstanceException(_('Can not receive '
'service tenant id.'))
self.share_networks_locks = {}
self.share_networks_servers = {}
self.service_network_id = self._get_service_network()
self.vif_driver = importutils.import_class(CONF.interface_driver)()
self._setup_connectivity_with_service_instances()
def _get_service_network(self):
"""Finds existing or creates new service network."""
service_network_name = CONF.service_network_name
networks = [network for network in self.neutron_api.
get_all_tenant_networks(self.service_tenant_id)
if network['name'] == service_network_name]
if len(networks) > 1:
raise exception.ServiceInstanceException(_('Ambiguous service '
'networks.'))
elif not networks:
return self.neutron_api.network_create(self.service_tenant_id,
service_network_name)['id']
else:
return networks[0]['id']
def _get_service_instance_name(self, share_network_id):
"""Returns service vms name."""
return CONF.service_instance_name_template % share_network_id
def _get_server_ip(self, server):
"""Returns service vms ip address."""
net = server['networks']
try:
net_ips = net[CONF.service_network_name]
return net_ips[0]
except KeyError:
msg = _('Service vm is not attached to %s network.')
except IndexError:
msg = _('Service vm has no ips on %s network.')
msg = msg % CONF.service_network_name
LOG.error(msg)
raise exception.ServiceInstanceException(msg)
def _ensure_server(self, context, server, update=False):
"""Ensures that server exists and active, otherwise deletes it."""
if not server:
return False
if update:
try:
server.update(self.compute_api.server_get(context,
server['id']))
except exception.InstanceNotFound as e:
LOG.debug(e)
return False
if server['status'] == 'ACTIVE':
if self._check_server_availability(server):
return True
return False
def _delete_server(self, context, server):
"""Deletes the server."""
if not server:
return
self.compute_api.server_delete(context, server['id'])
t = time.time()
while time.time() - t < CONF.max_time_to_build_instance:
try:
server = self.compute_api.server_get(context, server['id'])
except exception.InstanceNotFound:
LOG.debug(_('Service instance was deleted succesfully.'))
break
time.sleep(1)
else:
raise exception.ServiceInstanceException(_('Instance have not '
'been deleted in %ss. Giving up.') %
CONF.max_time_to_build_instance)
@synchronized
def get_service_instance(self, context, share_network_id, create=True):
"""Finds or creates and sets up service vm."""
server = self.share_networks_servers.get(share_network_id, {})
old_server_ip = server.get('ip', None)
if self._ensure_server(context, server, update=True):
return server
else:
self._delete_server(context, server)
server = {}
service_instance_name = self._get_service_instance_name(
share_network_id)
search_opts = {'name': service_instance_name}
servers = self.compute_api.server_list(context, search_opts, True)
if len(servers) == 1:
server = servers[0]
server['ip'] = self._get_server_ip(server)
old_server_ip = server['ip']
if not self._ensure_server(context, server):
self._delete_server(context, server)
server.clear()
elif len(servers) > 1:
raise exception.ServiceInstanceException(
_('Error. Ambiguous service instances.'))
if not server and create:
server = self._create_service_instance(context,
service_instance_name,
share_network_id,
old_server_ip)
if server:
server['share_network_id'] = share_network_id
server['ip'] = self._get_server_ip(server)
server['ssh_pool'] = self._get_ssh_pool(server)
server['ssh'] = server['ssh_pool'].create()
for helper in self._helpers.values():
helper.init_helper(server)
self.share_networks_servers[share_network_id] = server
return server
def _get_ssh_pool(self, server):
"""Returns ssh connection pool for service vm."""
ssh_pool = utils.SSHPool(server['ip'], 22, None,
CONF.service_instance_user,
password=CONF.service_instance_password,
privatekey=CONF.path_to_private_key,
max_size=1)
return ssh_pool
def _get_key(self, context):
"""Returns name of key, that will be injected to service vm."""
if not CONF.path_to_public_key or not CONF.path_to_private_key:
return
path_to_public_key = os.path.expanduser(CONF.path_to_public_key)
path_to_private_key = os.path.expanduser(CONF.path_to_private_key)
if (not os.path.exists(path_to_public_key) or
not os.path.exists(path_to_private_key)):
return
keypair_name = CONF.manila_service_keypair_name
keypairs = [k for k in self.compute_api.keypair_list(context)
if k.name == keypair_name]
if len(keypairs) > 1:
raise exception.ServiceInstanceException(_('Ambiguous keypairs.'))
public_key, __ = self._execute('cat', path_to_public_key)
if not keypairs:
keypair = self.compute_api.keypair_import(context,
keypair_name,
public_key)
else:
keypair = keypairs[0]
if keypair.public_key != public_key:
LOG.debug(_('Public key differs from existing keypair. '
'Creating new keypair.'))
self.compute_api.keypair_delete(context, keypair.id)
keypair = self.compute_api.keypair_import(context,
keypair_name,
public_key)
return keypair.name
def _get_service_image(self, context):
"""Returns ID of service image for service vm creating."""
images = [image.id for image in self.compute_api.image_list(context)
if image.name == CONF.service_image_name]
if len(images) == 1:
return images[0]
elif not images:
raise exception.ServiceInstanceException(_('No appropriate '
'image was found.'))
else:
raise exception.ServiceInstanceException(
_('Ambiguous image name.'))
def _create_service_instance(self, context, instance_name,
share_network_id, old_server_ip):
"""Creates service vm and sets up networking for it."""
service_image_id = self._get_service_image(context)
key_name = self._get_key(context)
if not CONF.service_instance_password and not key_name:
raise exception.ServiceInstanceException(
_('Neither service instance password nor key are available.'))
port = self._setup_network_for_instance(context,
share_network_id,
old_server_ip)
try:
self._setup_connectivity_with_service_instances()
except Exception as e:
LOG.debug(e)
self.neutron_api.delete_port(port['id'])
raise
service_instance = self.compute_api.server_create(context,
instance_name,
service_image_id,
CONF.service_instance_flavor_id,
key_name,
None,
None,
nics=[{'port-id': port['id']}])
t = time.time()
while time.time() - t < CONF.max_time_to_build_instance:
if service_instance['status'] == 'ACTIVE':
break
if service_instance['status'] == 'ERROR':
raise exception.ServiceInstanceException(
_('Failed to build service instance.'))
time.sleep(1)
try:
service_instance = self.compute_api.server_get(context,
service_instance['id'])
except exception.InstanceNotFound as e:
LOG.debug(e)
else:
raise exception.ServiceInstanceException(
_('Instance have not been spawned in %ss. Giving up.') %
CONF.max_time_to_build_instance)
service_instance['ip'] = self._get_server_ip(service_instance)
if not self._check_server_availability(service_instance):
raise exception.ServiceInstanceException(
_('SSH connection have not been '
'established in %ss. Giving up.') %
CONF.max_time_to_build_instance)
return service_instance
def _check_server_availability(self, server):
t = time.time()
while time.time() - t < CONF.max_time_to_build_instance:
LOG.debug(_('Checking service vm availablity.'))
try:
socket.socket().connect((server['ip'], 22))
LOG.debug(_('Service vm is available via ssh.'))
return True
except socket.error as e:
LOG.debug(e)
LOG.debug(_('Server is not available through ssh. Waiting...'))
time.sleep(5)
return False
def _setup_network_for_instance(self, context, share_network_id,
old_server_ip):
"""Sets up network for service vm."""
service_subnet = self._get_service_subnet(share_network_id)
if not service_subnet:
service_subnet = self.neutron_api.subnet_create(
self.service_tenant_id,
self.service_network_id,
share_network_id,
self._get_cidr_for_subnet())
private_router = self._get_private_router(share_network_id)
try:
self.neutron_api.router_add_interface(private_router['id'],
service_subnet['id'])
except exception.NetworkException as e:
if e.kwargs['code'] != 400:
raise
LOG.debug(_('Subnet %(subnet_id)s is already attached to the '
'router %(router_id)s.') %
{'subnet_id': service_subnet['id'],
'router_id': private_router['id']})
return self.neutron_api.create_port(self.service_tenant_id,
self.service_network_id,
subnet_id=service_subnet['id'],
fixed_ip=old_server_ip,
device_owner='manila')
def _get_private_router(self, share_network_id):
"""Returns router attached to private subnet gateway."""
share_network = self.db.share_network_get(self.admin_context,
share_network_id)
private_subnet = self.neutron_api.get_subnet(
share_network['neutron_subnet_id'])
if not private_subnet['gateway_ip']:
raise exception.ServiceInstanceException(
_('Subnet must have gateway.'))
private_network_ports = [p for p in self.neutron_api.list_ports(
network_id=share_network['neutron_net_id'])]
for p in private_network_ports:
fixed_ip = p['fixed_ips'][0]
if (fixed_ip['subnet_id'] == private_subnet['id'] and
fixed_ip['ip_address'] == private_subnet['gateway_ip']):
private_subnet_gateway_port = p
break
else:
raise exception.ServiceInstanceException(
_('Subnet gateway is not attached the router.'))
private_subnet_router = self.neutron_api.show_router(
private_subnet_gateway_port['device_id'])
return private_subnet_router
def _setup_connectivity_with_service_instances(self):
"""Sets up connectivity with service instances.
Creates creating port in service network, creating and setting up
required network devices.
"""
port = self._get_service_port()
port = self._add_fixed_ips_to_service_port(port)
interface_name = self.vif_driver.get_device_name(port)
self.vif_driver.plug(port['id'], interface_name, port['mac_address'])
ip_cidrs = []
for fixed_ip in port['fixed_ips']:
subnet = self.neutron_api.get_subnet(fixed_ip['subnet_id'])
net = netaddr.IPNetwork(subnet['cidr'])
ip_cidr = '%s/%s' % (fixed_ip['ip_address'], net.prefixlen)
ip_cidrs.append(ip_cidr)
self.vif_driver.init_l3(interface_name, ip_cidrs)
# ensure that interface is first in the list
device = ip_lib.IPDevice(interface_name)
device.route.pullup_route(interface_name)
# here we are checking for garbage devices from removed service port
self._remove_outdated_interfaces(device)
def _remove_outdated_interfaces(self, device):
"""Finds and removes unused network device."""
list_dev = []
for dev in ip_lib.IPWrapper().get_devices():
if dev.name != device.name and dev.name[:3] == device.name[:3]:
cidr_set = set()
for a in dev.addr.list():
if a['ip_version'] == 4:
cidr_set.add(str(netaddr.IPNetwork(a['cidr']).cidr))
list_dev.append((dev.name, cidr_set))
device_cidr_set = set(str(netaddr.IPNetwork(a['cidr']).cidr)
for a in device.addr.list()
if a['ip_version'] == 4)
for dev_name, cidr_set in list_dev:
if device_cidr_set & cidr_set:
self.vif_driver.unplug(dev_name)
def _get_service_port(self):
"""Find or creates service neutron port.
This port will be used for connectivity with service instances.
"""
ports = [port for port in self.neutron_api.
list_ports(device_id='manila-share')]
if len(ports) > 1:
raise exception.ServiceInstanceException(
_('Error. Ambiguous service ports.'))
elif not ports:
try:
stdout, stderr = self._execute('hostname')
host = stdout.strip()
except exception.ProcessExecutionError as e:
msg = _('Unable to get host. %s') % e.stderr
raise exception.ManilaException(msg)
port = self.neutron_api.create_port(self.service_tenant_id,
self.service_network_id,
device_id='manila-share',
device_owner='manila:share',
host_id=host)
else:
port = ports[0]
return port
def _add_fixed_ips_to_service_port(self, port):
network = self.neutron_api.get_network(self.service_network_id)
subnets = set(network['subnets'])
port_fixed_ips = []
for fixed_ip in port['fixed_ips']:
port_fixed_ips.append({'subnet_id': fixed_ip['subnet_id'],
'ip_address': fixed_ip['ip_address']})
if fixed_ip['subnet_id'] in subnets:
subnets.remove(fixed_ip['subnet_id'])
# If there are subnets here that means that
# we need to add those to the port and call update.
if subnets:
port_fixed_ips.extend([dict(subnet_id=s) for s in subnets])
port = self.neutron_api.update_port_fixed_ips(
port['id'], {'fixed_ips': port_fixed_ips})
return port
def _remove_fixed_ip_from_service_port(self, port, subnet_id):
port_fixed_ips = []
for fixed_ip in port['fixed_ips']:
if fixed_ip['subnet_id'] == subnet_id:
continue
port_fixed_ips.append({'subnet_id': fixed_ip['subnet_id'],
'ip_address': fixed_ip['ip_address']})
if port_fixed_ips != port['fixed_ips']:
port = self.neutron_api.update_port_fixed_ips(
port['id'], {'fixed_ips': port_fixed_ips})
return port
def _get_cidr_for_subnet(self):
"""Returns not used cidr for service subnet creating."""
subnets = self._get_all_service_subnets()
used_cidrs = set(subnet['cidr'] for subnet in subnets)
serv_cidr = netaddr.IPNetwork(CONF.service_network_cidr)
for subnet in serv_cidr.subnet(29):
cidr = str(subnet.cidr)
if cidr not in used_cidrs:
return cidr
else:
raise exception.ServiceInstanceException(_('No available cidrs.'))
def delete_share_infrastructure(self, context, share_network_id):
"""Removes share infrastructure.
Deletes service vm and subnet, associated to share network.
"""
server = self.get_service_instance(context,
share_network_id=share_network_id,
create=False)
if server:
self._delete_server(context, server)
subnet_id = self._get_service_subnet(share_network_id)
if subnet_id:
router = self._get_private_router(share_network_id)
port = self._get_service_port()
self.neutron_api.router_remove_interface(router['id'], subnet_id)
self._remove_fixed_ip_from_service_port(port, subnet_id)
self.neutron_api.delete_subnet(subnet_id)
self._setup_connectivity_with_service_instances()
def _get_all_service_subnets(self):
service_network = self.neutron_api.get_network(self.service_network_id)
return [self.neutron_api.get_subnet(subnet_id)
for subnet_id in service_network['subnets']]
def _get_service_subnet(self, share_network_id):
all_service_subnets = self._get_all_service_subnets()
service_subnets = [subnet for subnet in all_service_subnets
if subnet['name'] == share_network_id]
if len(service_subnets) == 1:
return service_subnets[0]
elif not service_subnets:
return None
else:
raise exception.ServiceInstanceException(_('Ambiguous service '
'subnets.'))

1
manila/tests/conf_fixture.py

@ -34,5 +34,6 @@ def set_defaults(conf):
conf.set_default('sqlite_synchronous', False)
conf.set_default('policy_file', 'manila/tests/policy.json')
conf.set_default('share_export_ip', '0.0.0.0')
conf.set_default('service_instance_user', 'fake_user')
conf.set_default('share_driver',
'manila.tests.fake_driver.FakeShareDriver')

657
manila/tests/test_service_instance.py

@ -0,0 +1,657 @@
# Copyright (c) 2014 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the instance module."""
import copy
import mock
from manila import context
from manila import exception
from manila.share.drivers import service_instance
from manila import test
from manila.tests.db import fakes as db_fakes
from manila.tests import fake_compute
from manila.tests import fake_network
from oslo.config import cfg
CONF = cfg.CONF
def fake_share(**kwargs):
share = {
'id': 'fakeid',
'name': 'fakename',
'size': 1,
'share_proto': 'NFS',
'share_network_id': 'fake share network id',
'export_location': '127.0.0.1:/mnt/nfs/volume-00002',
}
share.update(kwargs)
return db_fakes.FakeModel(share)
class ServiceInstanceManagerTestCase(test.TestCase):
"""Tests InstanceManager."""
def setUp