# Copyright (c) 2010 OpenStack, LLC. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ Scheduler Service """ from oslo_config import cfg from oslo_log import log from oslo_service import periodic_task from oslo_utils import excutils from oslo_utils import importutils from manila.common import constants from manila import context from manila import coordination from manila import db from manila import exception from manila import manager from manila.message import api as message_api from manila.message import message_field from manila import quota from manila import rpc from manila.share import rpcapi as share_rpcapi LOG = log.getLogger(__name__) scheduler_driver_opt = cfg.StrOpt('scheduler_driver', default='manila.scheduler.drivers.' 'filter.FilterScheduler', help='Default scheduler driver to use.') CONF = cfg.CONF CONF.register_opt(scheduler_driver_opt) # Drivers that need to change module paths or class names can add their # old/new path here to maintain backward compatibility. MAPPING = { 'manila.scheduler.chance.ChanceScheduler': 'manila.scheduler.drivers.chance.ChanceScheduler', 'manila.scheduler.filter_scheduler.FilterScheduler': 'manila.scheduler.drivers.filter.FilterScheduler', 'manila.scheduler.simple.SimpleScheduler': 'manila.scheduler.drivers.simple.SimpleScheduler', } class SchedulerManager(manager.Manager): """Chooses a host to create shares.""" RPC_API_VERSION = '1.9' def __init__(self, scheduler_driver=None, service_name=None, *args, **kwargs): if not scheduler_driver: scheduler_driver = CONF.scheduler_driver if scheduler_driver in MAPPING: msg_args = { 'old': scheduler_driver, 'new': MAPPING[scheduler_driver], } LOG.warning("Scheduler driver path %(old)s is deprecated, " "update your configuration to the new path " "%(new)s", msg_args) scheduler_driver = MAPPING[scheduler_driver] self.driver = importutils.import_object(scheduler_driver) self.message_api = message_api.API() super(SchedulerManager, self).__init__(*args, **kwargs) def init_host(self): ctxt = context.get_admin_context() self.request_service_capabilities(ctxt) def get_host_list(self, context): """Get a list of hosts from the HostManager.""" return self.driver.get_host_list() def get_service_capabilities(self, context): """Get the normalized set of capabilities for this zone.""" return self.driver.get_service_capabilities() def update_service_capabilities(self, context, service_name=None, host=None, capabilities=None, **kwargs): """Process a capability update from a service node.""" if capabilities is None: capabilities = {} self.driver.update_service_capabilities(service_name, host, capabilities) def create_share_instance(self, context, request_spec=None, filter_properties=None): try: self.driver.schedule_create_share(context, request_spec, filter_properties) except exception.NoValidHost as ex: self._set_share_state_and_notify( 'create_share', {'status': constants.STATUS_ERROR}, context, ex, request_spec, message_field.Action.ALLOCATE_HOST) except Exception as ex: with excutils.save_and_reraise_exception(): self._set_share_state_and_notify( 'create_share', {'status': constants.STATUS_ERROR}, context, ex, request_spec) def get_pools(self, context, filters=None, cached=False): """Get active pools from the scheduler's cache.""" return self.driver.get_pools(context, filters, cached) def manage_share(self, context, share_id, driver_options, request_spec, filter_properties=None): """Ensure that the host exists and can accept the share.""" def _manage_share_set_error(self, context, ex, request_spec): # NOTE(nidhimittalhada): set size as 1 because design expects size # to be set, it also will allow us to handle delete/unmanage # operations properly with this errored share according to quotas. self._set_share_state_and_notify( 'manage_share', {'status': constants.STATUS_MANAGE_ERROR, 'size': 1}, context, ex, request_spec) share_ref = db.share_get(context, share_id) try: self.driver.host_passes_filters( context, share_ref['host'], request_spec, filter_properties) except Exception as ex: with excutils.save_and_reraise_exception(): _manage_share_set_error(self, context, ex, request_spec) else: share_rpcapi.ShareAPI().manage_share(context, share_ref, driver_options) def migrate_share_to_host( self, context, share_id, host, force_host_assisted_migration, preserve_metadata, writable, nondisruptive, preserve_snapshots, new_share_network_id, new_share_type_id, request_spec, filter_properties=None): """Ensure that the host exists and can accept the share.""" share_ref = db.share_get(context, share_id) def _migrate_share_set_error(self, context, ex, request_spec): instance = next((x for x in share_ref.instances if x['status'] == constants.STATUS_MIGRATING), None) if instance: db.share_instance_update( context, instance['id'], {'status': constants.STATUS_AVAILABLE}) self._set_share_state_and_notify( 'migrate_share_to_host', {'task_state': constants.TASK_STATE_MIGRATION_ERROR}, context, ex, request_spec) try: tgt_host = self.driver.host_passes_filters( context, host, request_spec, filter_properties) except Exception as ex: with excutils.save_and_reraise_exception(): _migrate_share_set_error(self, context, ex, request_spec) else: try: share_rpcapi.ShareAPI().migration_start( context, share_ref, tgt_host.host, force_host_assisted_migration, preserve_metadata, writable, nondisruptive, preserve_snapshots, new_share_network_id, new_share_type_id) except Exception as ex: with excutils.save_and_reraise_exception(): _migrate_share_set_error(self, context, ex, request_spec) def _set_share_state_and_notify(self, method, state, context, ex, request_spec, action=None): LOG.error("Failed to schedule %(method)s: %(ex)s", {"method": method, "ex": ex}) properties = request_spec.get('share_properties', {}) share_id = request_spec.get('share_id', None) if share_id: db.share_update(context, share_id, state) if action: self.message_api.create( context, action, context.project_id, resource_type=message_field.Resource.SHARE, resource_id=share_id, exception=ex) payload = dict(request_spec=request_spec, share_properties=properties, share_id=share_id, state=state, method=method, reason=ex) rpc.get_notifier("scheduler").error( context, 'scheduler.' + method, payload) def request_service_capabilities(self, context): share_rpcapi.ShareAPI().publish_service_capabilities(context) def _set_share_group_error_state(self, method, context, ex, request_spec, action=None): LOG.warning("Failed to schedule_%(method)s: %(ex)s", {"method": method, "ex": ex}) share_group_state = {'status': constants.STATUS_ERROR} share_group_id = request_spec.get('share_group_id') if share_group_id: db.share_group_update(context, share_group_id, share_group_state) if action: self.message_api.create( context, action, context.project_id, resource_type=message_field.Resource.SHARE_GROUP, resource_id=share_group_id, exception=ex) @periodic_task.periodic_task(spacing=600, run_immediately=True) def _expire_reservations(self, context): quota.QUOTAS.expire(context) def create_share_group(self, context, share_group_id, request_spec=None, filter_properties=None): try: self.driver.schedule_create_share_group( context, share_group_id, request_spec, filter_properties) except exception.NoValidHost as ex: self._set_share_group_error_state( 'create_share_group', context, ex, request_spec, message_field.Action.ALLOCATE_HOST) except Exception as ex: with excutils.save_and_reraise_exception(): self._set_share_group_error_state( 'create_share_group', context, ex, request_spec) def _set_share_replica_error_state(self, context, method, exc, request_spec, action=None): LOG.warning("Failed to schedule_%(method)s: %(exc)s", {'method': method, 'exc': exc}) status_updates = { 'status': constants.STATUS_ERROR, 'replica_state': constants.STATUS_ERROR, } share_replica_id = request_spec.get( 'share_instance_properties').get('id') # Set any snapshot instances to 'error'. replica_snapshots = db.share_snapshot_instance_get_all_with_filters( context, {'share_instance_ids': share_replica_id}) for snapshot_instance in replica_snapshots: db.share_snapshot_instance_update( context, snapshot_instance['id'], {'status': constants.STATUS_ERROR}) db.share_replica_update(context, share_replica_id, status_updates) if action: self.message_api.create( context, action, context.project_id, resource_type=message_field.Resource.SHARE_REPLICA, resource_id=share_replica_id, exception=exc) def create_share_replica(self, context, request_spec=None, filter_properties=None): try: self.driver.schedule_create_replica(context, request_spec, filter_properties) except exception.NoValidHost as exc: self._set_share_replica_error_state( context, 'create_share_replica', exc, request_spec, message_field.Action.ALLOCATE_HOST) except Exception as exc: with excutils.save_and_reraise_exception(): self._set_share_replica_error_state( context, 'create_share_replica', exc, request_spec) @periodic_task.periodic_task(spacing=CONF.message_reap_interval, run_immediately=True) @coordination.synchronized('locked-clean-expired-messages') def _clean_expired_messages(self, context): self.message_api.cleanup_expired_messages(context)