# Copyright 2012 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Handles all requests to the conductor service.""" from oslo.config import cfg from nova import baserpc from nova.conductor import manager from nova.conductor import rpcapi from nova.openstack.common.gettextutils import _ from nova.openstack.common import log as logging from nova.openstack.common.rpc import common as rpc_common from nova import utils conductor_opts = [ cfg.BoolOpt('use_local', default=False, help='Perform nova-conductor operations locally'), cfg.StrOpt('topic', default='conductor', help='the topic conductor nodes listen on'), cfg.StrOpt('manager', default='nova.conductor.manager.ConductorManager', help='full class name for the Manager for conductor'), cfg.IntOpt('workers', help='Number of workers for OpenStack Conductor service') ] conductor_group = cfg.OptGroup(name='conductor', title='Conductor Options') CONF = cfg.CONF CONF.register_group(conductor_group) CONF.register_opts(conductor_opts, conductor_group) LOG = logging.getLogger(__name__) class LocalAPI(object): """A local version of the conductor API that does database updates locally instead of via RPC. """ def __init__(self): # TODO(danms): This needs to be something more generic for # other/future users of this sort of functionality. self._manager = utils.ExceptionHelper(manager.ConductorManager()) def wait_until_ready(self, context, *args, **kwargs): # nothing to wait for in the local case. pass def instance_update(self, context, instance_uuid, **updates): """Perform an instance update in the database.""" return self._manager.instance_update(context, instance_uuid, updates, 'compute') def instance_get(self, context, instance_id): return self._manager.instance_get(context, instance_id) def instance_get_by_uuid(self, context, instance_uuid, columns_to_join=None): return self._manager.instance_get_by_uuid(context, instance_uuid, columns_to_join) def instance_destroy(self, context, instance): return self._manager.instance_destroy(context, instance) def instance_get_all_by_host(self, context, host, columns_to_join=None): return self._manager.instance_get_all_by_host( context, host, columns_to_join=columns_to_join) def instance_get_all_by_host_and_node(self, context, host, node): return self._manager.instance_get_all_by_host(context, host, node) def instance_get_all_by_filters(self, context, filters, sort_key='created_at', sort_dir='desc', columns_to_join=None): return self._manager.instance_get_all_by_filters(context, filters, sort_key, sort_dir, columns_to_join) def instance_get_active_by_window_joined(self, context, begin, end=None, project_id=None, host=None): return self._manager.instance_get_active_by_window_joined( context, begin, end, project_id, host) def instance_info_cache_delete(self, context, instance): return self._manager.instance_info_cache_delete(context, instance) def instance_type_get(self, context, instance_type_id): return self._manager.instance_type_get(context, instance_type_id) def instance_fault_create(self, context, values): return self._manager.instance_fault_create(context, values) def migration_get_in_progress_by_host_and_node(self, context, host, node): return self._manager.migration_get_in_progress_by_host_and_node( context, host, node) def migration_update(self, context, migration, status): return self._manager.migration_update(context, migration, status) def aggregate_host_add(self, context, aggregate, host): return self._manager.aggregate_host_add(context, aggregate, host) def aggregate_host_delete(self, context, aggregate, host): return self._manager.aggregate_host_delete(context, aggregate, host) def aggregate_get(self, context, aggregate_id): return self._manager.aggregate_get(context, aggregate_id) def aggregate_get_by_host(self, context, host, key=None): return self._manager.aggregate_get_by_host(context, host, key) def aggregate_metadata_get_by_host(self, context, host, key='availability_zone'): return self._manager.aggregate_metadata_get_by_host(context, host, key) def bw_usage_get(self, context, uuid, start_period, mac): return self._manager.bw_usage_update(context, uuid, mac, start_period) def bw_usage_update(self, context, uuid, mac, start_period, bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None, update_cells=True): return self._manager.bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed, update_cells=update_cells) def provider_fw_rule_get_all(self, context): return self._manager.provider_fw_rule_get_all(context) def agent_build_get_by_triple(self, context, hypervisor, os, architecture): return self._manager.agent_build_get_by_triple(context, hypervisor, os, architecture) def block_device_mapping_create(self, context, values): return self._manager.block_device_mapping_update_or_create(context, values, create=True) def block_device_mapping_update(self, context, bdm_id, values): values = dict(values) values['id'] = bdm_id return self._manager.block_device_mapping_update_or_create( context, values, create=False) def block_device_mapping_update_or_create(self, context, values): return self._manager.block_device_mapping_update_or_create(context, values) def block_device_mapping_get_all_by_instance(self, context, instance, legacy=True): return self._manager.block_device_mapping_get_all_by_instance( context, instance, legacy) def block_device_mapping_destroy(self, context, bdms): return self._manager.block_device_mapping_destroy(context, bdms=bdms) def block_device_mapping_destroy_by_instance_and_device(self, context, instance, device_name): return self._manager.block_device_mapping_destroy( context, instance=instance, device_name=device_name) def block_device_mapping_destroy_by_instance_and_volume(self, context, instance, volume_id): return self._manager.block_device_mapping_destroy( context, instance=instance, volume_id=volume_id) def vol_get_usage_by_time(self, context, start_time): return self._manager.vol_get_usage_by_time(context, start_time) def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req, wr_bytes, instance, last_refreshed=None, update_totals=False): return self._manager.vol_usage_update(context, vol_id, rd_req, rd_bytes, wr_req, wr_bytes, instance, last_refreshed, update_totals) def service_get_all(self, context): return self._manager.service_get_all_by(context) def service_get_all_by_topic(self, context, topic): return self._manager.service_get_all_by(context, topic=topic) def service_get_all_by_host(self, context, host): return self._manager.service_get_all_by(context, host=host) def service_get_by_host_and_topic(self, context, host, topic): return self._manager.service_get_all_by(context, topic, host) def service_get_by_compute_host(self, context, host): result = self._manager.service_get_all_by(context, 'compute', host) # FIXME(comstud): A major revision bump to 2.0 should return a # single entry, so we should just return 'result' at that point. return result[0] def service_get_by_args(self, context, host, binary): return self._manager.service_get_all_by(context, host=host, binary=binary) def action_event_start(self, context, values): return self._manager.action_event_start(context, values) def action_event_finish(self, context, values): return self._manager.action_event_finish(context, values) def service_create(self, context, values): return self._manager.service_create(context, values) def service_destroy(self, context, service_id): return self._manager.service_destroy(context, service_id) def compute_node_create(self, context, values): return self._manager.compute_node_create(context, values) def compute_node_update(self, context, node, values, prune_stats=False): return self._manager.compute_node_update(context, node, values, prune_stats) def compute_node_delete(self, context, node): return self._manager.compute_node_delete(context, node) def service_update(self, context, service, values): return self._manager.service_update(context, service, values) def task_log_get(self, context, task_name, begin, end, host, state=None): return self._manager.task_log_get(context, task_name, begin, end, host, state) def task_log_begin_task(self, context, task_name, begin, end, host, task_items=None, message=None): return self._manager.task_log_begin_task(context, task_name, begin, end, host, task_items, message) def task_log_end_task(self, context, task_name, begin, end, host, errors, message=None): return self._manager.task_log_end_task(context, task_name, begin, end, host, errors, message) def notify_usage_exists(self, context, instance, current_period=False, ignore_missing_network_data=True, system_metadata=None, extra_usage_info=None): return self._manager.notify_usage_exists( context, instance, current_period, ignore_missing_network_data, system_metadata, extra_usage_info) def security_groups_trigger_handler(self, context, event, *args): return self._manager.security_groups_trigger_handler(context, event, args) def security_groups_trigger_members_refresh(self, context, group_ids): return self._manager.security_groups_trigger_members_refresh(context, group_ids) def network_migrate_instance_start(self, context, instance, migration): return self._manager.network_migrate_instance_start(context, instance, migration) def network_migrate_instance_finish(self, context, instance, migration): return self._manager.network_migrate_instance_finish(context, instance, migration) def quota_commit(self, context, reservations, project_id=None, user_id=None): return self._manager.quota_commit(context, reservations, project_id=project_id, user_id=user_id) def quota_rollback(self, context, reservations, project_id=None, user_id=None): return self._manager.quota_rollback(context, reservations, project_id=project_id, user_id=user_id) def get_ec2_ids(self, context, instance): return self._manager.get_ec2_ids(context, instance) def compute_unrescue(self, context, instance): return self._manager.compute_unrescue(context, instance) def object_backport(self, context, objinst, target_version): return self._manager.object_backport(context, objinst, target_version) class LocalComputeTaskAPI(object): def __init__(self): # TODO(danms): This needs to be something more generic for # other/future users of this sort of functionality. self._manager = utils.ExceptionHelper( manager.ComputeTaskManager()) def resize_instance(self, context, instance, extra_instance_updates, scheduler_hint, flavor, reservations): # NOTE(comstud): 'extra_instance_updates' is not used here but is # needed for compatibility with the cells_rpcapi version of this # method. self._manager.migrate_server( context, instance, scheduler_hint, False, False, flavor, None, None, reservations) def live_migrate_instance(self, context, instance, host_name, block_migration, disk_over_commit): scheduler_hint = {'host': host_name} self._manager.migrate_server( context, instance, scheduler_hint, True, False, None, block_migration, disk_over_commit, None) def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): utils.spawn_n(self._manager.build_instances, context, instances=instances, image=image, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=legacy_bdm) def unshelve_instance(self, context, instance): utils.spawn_n(self._manager.unshelve_instance, context, instance=instance) class API(LocalAPI): """Conductor API that does updates via RPC to the ConductorManager.""" def __init__(self): self._manager = rpcapi.ConductorAPI() self.base_rpcapi = baserpc.BaseAPI(topic=CONF.conductor.topic) def wait_until_ready(self, context, early_timeout=10, early_attempts=10): '''Wait until a conductor service is up and running. This method calls the remote ping() method on the conductor topic until it gets a response. It starts with a shorter timeout in the loop (early_timeout) up to early_attempts number of tries. It then drops back to the globally configured timeout for rpc calls for each retry. ''' attempt = 0 timeout = early_timeout while True: # NOTE(danms): Try ten times with a short timeout, and then punt # to the configured RPC timeout after that if attempt == early_attempts: timeout = None attempt += 1 # NOTE(russellb): This is running during service startup. If we # allow an exception to be raised, the service will shut down. # This may fail the first time around if nova-conductor wasn't # running when this service started. try: self.base_rpcapi.ping(context, '1.21 GigaWatts', timeout=timeout) break except rpc_common.Timeout: LOG.warning(_('Timed out waiting for nova-conductor. ' 'Is it running? Or did this service start ' 'before nova-conductor?')) def instance_update(self, context, instance_uuid, **updates): """Perform an instance update in the database.""" return self._manager.instance_update(context, instance_uuid, updates, 'conductor') class ComputeTaskAPI(object): """ComputeTask API that queues up compute tasks for nova-conductor.""" def __init__(self): self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI() def resize_instance(self, context, instance, extra_instance_updates, scheduler_hint, flavor, reservations): # NOTE(comstud): 'extra_instance_updates' is not used here but is # needed for compatibility with the cells_rpcapi version of this # method. self.conductor_compute_rpcapi.migrate_server( context, instance, scheduler_hint, False, False, flavor, None, None, reservations) def live_migrate_instance(self, context, instance, host_name, block_migration, disk_over_commit): scheduler_hint = {'host': host_name} self.conductor_compute_rpcapi.migrate_server( context, instance, scheduler_hint, True, False, None, block_migration, disk_over_commit, None) def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): self.conductor_compute_rpcapi.build_instances(context, instances=instances, image=image, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=legacy_bdm) def unshelve_instance(self, context, instance): self.conductor_compute_rpcapi.unshelve_instance(context, instance=instance)