Remove conductor 2.x RPC API

*** DO NOT MERGE UNTIL MITAKA OPENS ***

A previous change adding support for the 3.x rpc interface retained
compatibility with 2.x as a transition point. This commit removes the
old API from the server side.

Related to blueprint liberty-bump-object-and-rpcapi-versions

UpgradeImpact

Change-Id: I757541cfb35b23eb4d242a15b223ee2db02593e3
This commit is contained in:
Hans Lindgren
2015-03-23 17:49:50 +01:00
parent b0013d93ff
commit 4e0b995a49
2 changed files with 4 additions and 1085 deletions

View File

@@ -22,13 +22,8 @@ from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import timeutils
import six import six
from nova.api.ec2 import ec2utils
from nova import block_device
from nova.cells import rpcapi as cells_rpcapi
from nova.compute import api as compute_api
from nova.compute import rpcapi as compute_rpcapi from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states from nova.compute import task_states
from nova.compute import utils as compute_utils from nova.compute import utils as compute_utils
@@ -40,11 +35,8 @@ from nova import exception
from nova.i18n import _, _LE, _LW from nova.i18n import _, _LE, _LW
from nova import image from nova import image
from nova import manager from nova import manager
from nova import network
from nova.network.security_group import openstack_driver
from nova import objects from nova import objects
from nova.objects import base as nova_object from nova.objects import base as nova_object
from nova import quota
from nova import rpc from nova import rpc
from nova.scheduler import client as scheduler_client from nova.scheduler import client as scheduler_client
from nova.scheduler import utils as scheduler_utils from nova.scheduler import utils as scheduler_utils
@@ -54,21 +46,6 @@ from nova import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
# Instead of having a huge list of arguments to instance_update(), we just
# accept a dict of fields to update and use this whitelist to validate it.
allowed_updates = ['task_state', 'vm_state', 'expected_task_state',
'power_state', 'access_ip_v4', 'access_ip_v6',
'launched_at', 'terminated_at', 'host', 'node',
'memory_mb', 'vcpus', 'root_gb', 'ephemeral_gb',
'instance_type_id', 'root_device_name', 'launched_on',
'progress', 'vm_mode', 'default_ephemeral_device',
'default_swap_device', 'root_device_name',
'system_metadata', 'updated_at'
]
# Fields that we want to convert back into a datetime object.
datetime_fields = ['launched_at', 'terminated_at', 'updated_at']
class ConductorManager(manager.Manager): class ConductorManager(manager.Manager):
"""Mission: Conduct things. """Mission: Conduct things.
@@ -83,357 +60,18 @@ class ConductorManager(manager.Manager):
namespace. See the ComputeTaskManager class for details. namespace. See the ComputeTaskManager class for details.
""" """
target = messaging.Target(version='2.3') target = messaging.Target(version='3.0')
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(ConductorManager, self).__init__(service_name='conductor', super(ConductorManager, self).__init__(service_name='conductor',
*args, **kwargs) *args, **kwargs)
self.security_group_api = (
openstack_driver.get_openstack_security_group_driver())
self._network_api = None
self._compute_api = None
self.compute_task_mgr = ComputeTaskManager() self.compute_task_mgr = ComputeTaskManager()
self.cells_rpcapi = cells_rpcapi.CellsAPI()
self.additional_endpoints.append(self.compute_task_mgr) self.additional_endpoints.append(self.compute_task_mgr)
self.additional_endpoints.append(_ConductorManagerV3Proxy(self))
@property
def network_api(self):
# NOTE(danms): We need to instantiate our network_api on first use
# to avoid the circular dependency that exists between our init
# and network_api's
if self._network_api is None:
self._network_api = network.API()
return self._network_api
@property
def compute_api(self):
if self._compute_api is None:
self._compute_api = compute_api.API()
return self._compute_api
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(KeyError, ValueError,
exception.InvalidUUID,
exception.InstanceNotFound,
exception.UnexpectedTaskStateError)
def instance_update(self, context, instance_uuid,
updates, service):
for key, value in six.iteritems(updates):
if key not in allowed_updates:
LOG.error(_LE("Instance update attempted for "
"'%(key)s' on %(instance_uuid)s"),
{'key': key, 'instance_uuid': instance_uuid})
raise KeyError("unexpected update keyword '%s'" % key)
if key in datetime_fields and isinstance(value, six.string_types):
updates[key] = timeutils.parse_strtime(value)
instance = objects.Instance(context=context, uuid=instance_uuid,
**updates)
instance.obj_reset_changes(['uuid'])
instance.save()
return nova_object.obj_to_primitive(instance)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.InstanceNotFound)
def instance_get_by_uuid(self, context, instance_uuid,
columns_to_join):
return jsonutils.to_primitive(
self.db.instance_get_by_uuid(context, instance_uuid,
columns_to_join))
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def instance_get_all_by_host(self, context, host, node,
columns_to_join):
if node is not None:
result = self.db.instance_get_all_by_host_and_node(
context.elevated(), host, node)
else:
result = self.db.instance_get_all_by_host(context.elevated(), host,
columns_to_join)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def migration_get_in_progress_by_host_and_node(self, context,
host, node):
migrations = self.db.migration_get_in_progress_by_host_and_node(
context, host, node)
return jsonutils.to_primitive(migrations)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.AggregateHostExists)
def aggregate_host_add(self, context, aggregate, host):
host_ref = self.db.aggregate_host_add(context.elevated(),
aggregate['id'], host)
return jsonutils.to_primitive(host_ref)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.AggregateHostNotFound)
def aggregate_host_delete(self, context, aggregate, host):
self.db.aggregate_host_delete(context.elevated(),
aggregate['id'], host)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def aggregate_metadata_get_by_host(self, context, host,
key='availability_zone'):
result = self.db.aggregate_metadata_get_by_host(context, host, key)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def bw_usage_update(self, context, uuid, mac, start_period,
bw_in, bw_out, last_ctr_in, last_ctr_out,
last_refreshed, update_cells):
if [bw_in, bw_out, last_ctr_in, last_ctr_out].count(None) != 4:
self.db.bw_usage_update(context, uuid, mac, start_period,
bw_in, bw_out, last_ctr_in, last_ctr_out,
last_refreshed,
update_cells=update_cells)
usage = self.db.bw_usage_get(context, uuid, start_period, mac)
return jsonutils.to_primitive(usage)
def provider_fw_rule_get_all(self, context): def provider_fw_rule_get_all(self, context):
rules = self.db.provider_fw_rule_get_all(context) rules = self.db.provider_fw_rule_get_all(context)
return jsonutils.to_primitive(rules) return jsonutils.to_primitive(rules)
# NOTE(danms): This can be removed in version 3.0 of the RPC API
def agent_build_get_by_triple(self, context, hypervisor, os, architecture):
info = self.db.agent_build_get_by_triple(context, hypervisor, os,
architecture)
return jsonutils.to_primitive(info)
# NOTE(ndipanov): This can be removed in version 3.0 of the RPC API
def block_device_mapping_update_or_create(self, context, values, create):
if create is None:
bdm = self.db.block_device_mapping_update_or_create(context,
values)
elif create is True:
bdm = self.db.block_device_mapping_create(context, values)
else:
bdm = self.db.block_device_mapping_update(context,
values['id'],
values)
bdm_obj = objects.BlockDeviceMapping._from_db_object(
context, objects.BlockDeviceMapping(), bdm)
self.cells_rpcapi.bdm_update_or_create_at_top(context, bdm_obj,
create=create)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def block_device_mapping_get_all_by_instance(self, context, instance,
legacy):
bdms = self.db.block_device_mapping_get_all_by_instance(
context, instance['uuid'])
if legacy:
bdms = block_device.legacy_mapping(bdms)
return jsonutils.to_primitive(bdms)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def instance_get_all_by_filters(self, context, filters, sort_key,
sort_dir, columns_to_join,
use_slave):
result = self.db.instance_get_all_by_filters(
context, filters, sort_key, sort_dir,
columns_to_join=columns_to_join, use_slave=use_slave)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def instance_get_active_by_window_joined(self, context, begin, end,
project_id, host):
result = self.db.instance_get_active_by_window_joined(
context, begin, end, project_id, host)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def instance_destroy(self, context, instance):
if not isinstance(instance, objects.Instance):
instance = objects.Instance._from_db_object(context,
objects.Instance(),
instance)
instance.destroy()
return nova_object.obj_to_primitive(instance)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def instance_fault_create(self, context, values):
result = self.db.instance_fault_create(context, values)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req,
wr_bytes, instance, last_refreshed, update_totals):
vol_usage = self.db.vol_usage_update(context, vol_id,
rd_req, rd_bytes,
wr_req, wr_bytes,
instance['uuid'],
instance['project_id'],
instance['user_id'],
instance['availability_zone'],
update_totals)
# We have just updated the database, so send the notification now
vol_usage = objects.VolumeUsage._from_db_object(
context, objects.VolumeUsage(), vol_usage)
self.notifier.info(context, 'volume.usage',
compute_utils.usage_volume_info(vol_usage))
# NOTE(hanlind): This method can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.ComputeHostNotFound,
exception.HostBinaryNotFound)
def service_get_all_by(self, context, topic, host, binary):
if not any((topic, host, binary)):
result = self.db.service_get_all(context)
elif all((topic, host)):
if topic == 'compute':
result = self.db.service_get_by_compute_host(context, host)
# NOTE(sbauza): Only Juno computes are still calling this
# conductor method for getting service_get_by_compute_node,
# but expect a compute_node field so we can safely add it.
result['compute_node'
] = objects.ComputeNodeList.get_all_by_host(
context, result['host'])
# FIXME(comstud) Potentially remove this on bump to v3.0
result = [result]
else:
result = self.db.service_get_by_host_and_topic(context,
host, topic)
elif all((host, binary)):
result = self.db.service_get_by_host_and_binary(
context, host, binary)
elif topic:
result = self.db.service_get_all_by_topic(context, topic)
elif host:
result = self.db.service_get_all_by_host(context, host)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.InstanceActionNotFound)
def action_event_start(self, context, values):
evt = self.db.action_event_start(context, values)
return jsonutils.to_primitive(evt)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.InstanceActionNotFound,
exception.InstanceActionEventNotFound)
def action_event_finish(self, context, values):
evt = self.db.action_event_finish(context, values)
return jsonutils.to_primitive(evt)
# NOTE(hanlind): This method can be removed in version 3.0 of the RPC API
def service_create(self, context, values):
svc = self.db.service_create(context, values)
return jsonutils.to_primitive(svc)
# NOTE(hanlind): This method can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.ServiceNotFound)
def service_destroy(self, context, service_id):
self.db.service_destroy(context, service_id)
# NOTE(hanlind): This method can be removed in version 3.0 of the RPC API
def compute_node_create(self, context, values):
result = self.db.compute_node_create(context, values)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def compute_node_update(self, context, node, values):
result = self.db.compute_node_update(context, node['id'], values)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def compute_node_delete(self, context, node):
result = self.db.compute_node_delete(context, node['id'])
return jsonutils.to_primitive(result)
# NOTE(hanlind): This method can be removed in version 3.0 of the RPC API
@messaging.expected_exceptions(exception.ServiceNotFound)
def service_update(self, context, service, values):
svc = self.db.service_update(context, service['id'], values)
return jsonutils.to_primitive(svc)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def task_log_get(self, context, task_name, begin, end, host, state):
result = self.db.task_log_get(context, task_name, begin, end, host,
state)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def task_log_begin_task(self, context, task_name, begin, end, host,
task_items, message):
result = self.db.task_log_begin_task(context.elevated(), task_name,
begin, end, host, task_items,
message)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def task_log_end_task(self, context, task_name, begin, end, host,
errors, message):
result = self.db.task_log_end_task(context.elevated(), task_name,
begin, end, host, errors, message)
return jsonutils.to_primitive(result)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def notify_usage_exists(self, context, instance, current_period,
ignore_missing_network_data,
system_metadata, extra_usage_info):
if not isinstance(instance, objects.Instance):
attrs = ['metadata', 'system_metadata']
instance = objects.Instance._from_db_object(context,
objects.Instance(),
instance,
expected_attrs=attrs)
compute_utils.notify_usage_exists(self.notifier, context, instance,
current_period,
ignore_missing_network_data,
system_metadata, extra_usage_info)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def security_groups_trigger_handler(self, context, event, args):
self.security_group_api.trigger_handler(event, context, *args)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def security_groups_trigger_members_refresh(self, context, group_ids):
self.security_group_api.trigger_members_refresh(context, group_ids)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def network_migrate_instance_start(self, context, instance, migration):
self.network_api.migrate_instance_start(context, instance, migration)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def network_migrate_instance_finish(self, context, instance, migration):
self.network_api.migrate_instance_finish(context, instance, migration)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def quota_commit(self, context, reservations, project_id=None,
user_id=None):
quota.QUOTAS.commit(context, reservations, project_id=project_id,
user_id=user_id)
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def quota_rollback(self, context, reservations, project_id=None,
user_id=None):
quota.QUOTAS.rollback(context, reservations, project_id=project_id,
user_id=user_id)
# NOTE(hanlind): This method can be removed in version 3.0 of the RPC API
def get_ec2_ids(self, context, instance):
ec2_ids = {}
ec2_ids['instance-id'] = ec2utils.id_to_ec2_inst_id(instance['uuid'])
ec2_ids['ami-id'] = ec2utils.glance_id_to_ec2_id(context,
instance['image_ref'])
for image_type in ['kernel', 'ramdisk']:
image_id = instance.get('%s_id' % image_type)
if image_id is not None:
ec2_image_type = ec2utils.image_type(image_type)
ec2_id = ec2utils.glance_id_to_ec2_id(context, image_id,
ec2_image_type)
ec2_ids['%s-id' % image_type] = ec2_id
return ec2_ids
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def compute_unrescue(self, context, instance):
self.compute_api.unrescue(context, instance)
def _object_dispatch(self, target, method, args, kwargs): def _object_dispatch(self, target, method, args, kwargs):
"""Dispatch a call to an object method. """Dispatch a call to an object method.
@@ -448,20 +86,6 @@ class ConductorManager(manager.Manager):
except Exception: except Exception:
raise messaging.ExpectedException() raise messaging.ExpectedException()
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def object_class_action(self, context, objname, objmethod,
objver, args, kwargs):
"""Perform a classmethod action on an object."""
objclass = nova_object.NovaObject.obj_class_from_name(objname,
objver)
args = tuple([context] + list(args))
result = self._object_dispatch(objclass, objmethod, args, kwargs)
# NOTE(danms): The RPC layer will convert to primitives for us,
# but in this case, we need to honor the version the client is
# asking for, so we do it before returning here.
return (result.obj_to_primitive(target_version=objver)
if isinstance(result, nova_object.NovaObject) else result)
def object_class_action_versions(self, context, objname, objmethod, def object_class_action_versions(self, context, objname, objmethod,
object_versions, args, kwargs): object_versions, args, kwargs):
objclass = nova_object.NovaObject.obj_class_from_name( objclass = nova_object.NovaObject.obj_class_from_name(
@@ -496,10 +120,6 @@ class ConductorManager(manager.Manager):
updates['obj_what_changed'] = objinst.obj_what_changed() updates['obj_what_changed'] = objinst.obj_what_changed()
return updates, result return updates, result
# NOTE(hanlind): This can be removed in version 3.0 of the RPC API
def object_backport(self, context, objinst, target_version):
return objinst.obj_to_primitive(target_version=target_version)
def object_backport_versions(self, context, objinst, object_versions): def object_backport_versions(self, context, objinst, object_versions):
target = object_versions[objinst.obj_name()] target = object_versions[objinst.obj_name()]
LOG.debug('Backporting %(obj)s to %(ver)s with versions %(manifest)s', LOG.debug('Backporting %(obj)s to %(ver)s with versions %(manifest)s',
@@ -919,27 +539,3 @@ class ComputeTaskManager(base.Base):
preserve_ephemeral=preserve_ephemeral, preserve_ephemeral=preserve_ephemeral,
migration=migration, migration=migration,
host=host, node=node, limits=limits) host=host, node=node, limits=limits)
class _ConductorManagerV3Proxy(object):
target = messaging.Target(version='3.0')
def __init__(self, manager):
self.manager = manager
def provider_fw_rule_get_all(self, context):
return self.manager.provider_fw_rule_get_all(context)
def object_class_action_versions(self, context, objname, objmethod,
object_versions, args, kwargs):
return self.manager.object_class_action_versions(
context, objname, objmethod, object_versions, args, kwargs)
def object_action(self, context, objinst, objmethod, args, kwargs):
return self.manager.object_action(context, objinst, objmethod, args,
kwargs)
def object_backport_versions(self, context, objinst, object_versions):
return self.manager.object_backport_versions(context, objinst,
object_versions)

View File

@@ -25,12 +25,9 @@ import oslo_messaging as messaging
from oslo_utils import timeutils from oslo_utils import timeutils
import six import six
from nova.api.ec2 import ec2utils
from nova.compute import arch
from nova.compute import flavors from nova.compute import flavors
from nova.compute import rpcapi as compute_rpcapi from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states from nova.compute import task_states
from nova.compute import utils as compute_utils
from nova.compute import vm_states from nova.compute import vm_states
from nova import conductor from nova import conductor
from nova.conductor import api as conductor_api from nova.conductor import api as conductor_api
@@ -40,13 +37,10 @@ from nova.conductor.tasks import live_migrate
from nova.conductor.tasks import migrate from nova.conductor.tasks import migrate
from nova import context from nova import context
from nova import db from nova import db
from nova.db.sqlalchemy import models
from nova import exception as exc from nova import exception as exc
from nova.image import api as image_api from nova.image import api as image_api
from nova import notifications
from nova import objects from nova import objects
from nova.objects import base as obj_base from nova.objects import base as obj_base
from nova.objects import block_device as block_device_obj
from nova.objects import fields from nova.objects import fields
from nova import rpc from nova import rpc
from nova.scheduler import client as scheduler_client from nova.scheduler import client as scheduler_client
@@ -54,18 +48,13 @@ from nova.scheduler import utils as scheduler_utils
from nova import test from nova import test
from nova.tests.unit import cast_as_call from nova.tests.unit import cast_as_call
from nova.tests.unit.compute import test_compute from nova.tests.unit.compute import test_compute
from nova.tests.unit import fake_block_device
from nova.tests.unit import fake_instance from nova.tests.unit import fake_instance
from nova.tests.unit import fake_notifier from nova.tests.unit import fake_notifier
from nova.tests.unit import fake_server_actions from nova.tests.unit import fake_server_actions
from nova.tests.unit import fake_utils from nova.tests.unit import fake_utils
from nova.tests.unit.objects import test_volume_usage
from nova import utils from nova import utils
FAKE_IMAGE_REF = 'fake-image-ref'
class FakeContext(context.RequestContext): class FakeContext(context.RequestContext):
def elevated(self): def elevated(self):
"""Return a consistent elevated context so we can detect it.""" """Return a consistent elevated context so we can detect it."""
@@ -111,216 +100,6 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
self.conductor = conductor_manager.ConductorManager() self.conductor = conductor_manager.ConductorManager()
self.conductor_manager = self.conductor self.conductor_manager = self.conductor
def _create_fake_instance(self, params=None, type_name='m1.tiny'):
if not params:
params = {}
inst = {}
inst['vm_state'] = vm_states.ACTIVE
inst['image_ref'] = FAKE_IMAGE_REF
inst['reservation_id'] = 'r-fakeres'
inst['user_id'] = self.user_id
inst['project_id'] = self.project_id
inst['host'] = 'fake_host'
type_id = flavors.get_flavor_by_name(type_name)['id']
inst['instance_type_id'] = type_id
inst['ami_launch_index'] = 0
inst['memory_mb'] = 0
inst['vcpus'] = 0
inst['root_gb'] = 0
inst['ephemeral_gb'] = 0
inst['architecture'] = arch.X86_64
inst['os_type'] = 'Linux'
inst['availability_zone'] = 'fake-az'
inst.update(params)
return db.instance_create(self.context, inst)
def _do_update(self, instance_uuid, **updates):
return self.conductor.instance_update(self.context, instance_uuid,
updates, None)
def test_instance_update(self):
instance = self._create_fake_instance()
new_inst = self._do_update(instance['uuid'],
vm_state=vm_states.STOPPED)
instance = db.instance_get_by_uuid(self.context, instance['uuid'])
self.assertEqual(instance['vm_state'], vm_states.STOPPED)
self.assertEqual(new_inst['vm_state'], instance['vm_state'])
def test_instance_update_invalid_key(self):
# NOTE(danms): the real DB API call ignores invalid keys
if self.db is None:
self.conductor = utils.ExceptionHelper(self.conductor)
self.assertRaises(KeyError,
self._do_update, 'any-uuid', foobar=1)
def test_instance_get_by_uuid(self):
orig_instance = self._create_fake_instance()
copy_instance = self.conductor.instance_get_by_uuid(
self.context, orig_instance['uuid'], None)
self.assertEqual(orig_instance['name'],
copy_instance['name'])
def test_block_device_mapping_update_or_create(self):
fake_bdm = {'id': 1, 'device_name': 'foo',
'source_type': 'volume', 'volume_id': 'fake-vol-id',
'destination_type': 'volume'}
fake_bdm = fake_block_device.FakeDbBlockDeviceDict(fake_bdm)
fake_bdm2 = {'id': 1, 'device_name': 'foo2',
'source_type': 'volume', 'volume_id': 'fake-vol-id',
'destination_type': 'volume'}
fake_bdm2 = fake_block_device.FakeDbBlockDeviceDict(fake_bdm2)
cells_rpcapi = self.conductor.cells_rpcapi
self.mox.StubOutWithMock(db, 'block_device_mapping_create')
self.mox.StubOutWithMock(db, 'block_device_mapping_update')
self.mox.StubOutWithMock(db, 'block_device_mapping_update_or_create')
self.mox.StubOutWithMock(cells_rpcapi,
'bdm_update_or_create_at_top')
db.block_device_mapping_create(self.context,
fake_bdm).AndReturn(fake_bdm2)
cells_rpcapi.bdm_update_or_create_at_top(
self.context, mox.IsA(block_device_obj.BlockDeviceMapping),
create=True)
db.block_device_mapping_update(self.context, fake_bdm['id'],
fake_bdm).AndReturn(fake_bdm2)
cells_rpcapi.bdm_update_or_create_at_top(
self.context, mox.IsA(block_device_obj.BlockDeviceMapping),
create=False)
self.mox.ReplayAll()
self.conductor.block_device_mapping_update_or_create(self.context,
fake_bdm,
create=True)
self.conductor.block_device_mapping_update_or_create(self.context,
fake_bdm,
create=False)
def test_instance_get_all_by_filters(self):
filters = {'foo': 'bar'}
self.mox.StubOutWithMock(db, 'instance_get_all_by_filters')
db.instance_get_all_by_filters(self.context, filters,
'fake-key', 'fake-sort',
columns_to_join=None, use_slave=False)
self.mox.ReplayAll()
self.conductor.instance_get_all_by_filters(self.context, filters,
'fake-key', 'fake-sort',
None, False)
def test_instance_get_all_by_filters_use_slave(self):
filters = {'foo': 'bar'}
self.mox.StubOutWithMock(db, 'instance_get_all_by_filters')
db.instance_get_all_by_filters(self.context, filters,
'fake-key', 'fake-sort',
columns_to_join=None, use_slave=True)
self.mox.ReplayAll()
self.conductor.instance_get_all_by_filters(self.context, filters,
'fake-key', 'fake-sort',
columns_to_join=None,
use_slave=True)
def test_instance_get_all_by_host(self):
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host_and_node')
db.instance_get_all_by_host(self.context.elevated(),
'host', None).AndReturn('result')
db.instance_get_all_by_host_and_node(self.context.elevated(), 'host',
'node').AndReturn('result')
self.mox.ReplayAll()
result = self.conductor.instance_get_all_by_host(self.context, 'host',
None, None)
self.assertEqual(result, 'result')
result = self.conductor.instance_get_all_by_host(self.context, 'host',
'node', None)
self.assertEqual(result, 'result')
def _test_stubbed(self, name, dbargs, condargs,
db_result_listified=False, db_exception=None):
self.mox.StubOutWithMock(db, name)
if db_exception:
getattr(db, name)(self.context, *dbargs).AndRaise(db_exception)
getattr(db, name)(self.context, *dbargs).AndRaise(db_exception)
else:
getattr(db, name)(self.context, *dbargs).AndReturn(condargs)
if name == 'service_get_by_compute_host':
self.mox.StubOutWithMock(
objects.ComputeNodeList, 'get_all_by_host')
objects.ComputeNodeList.get_all_by_host(
self.context, mox.IgnoreArg()
).AndReturn(['fake-compute'])
self.mox.ReplayAll()
if db_exception:
self.assertRaises(messaging.ExpectedException,
self.conductor.service_get_all_by,
self.context, **condargs)
self.conductor = utils.ExceptionHelper(self.conductor)
self.assertRaises(db_exception.__class__,
self.conductor.service_get_all_by,
self.context, **condargs)
else:
result = self.conductor.service_get_all_by(self.context,
**condargs)
if db_result_listified:
if name == 'service_get_by_compute_host':
condargs['compute_node'] = ['fake-compute']
self.assertEqual([condargs], result)
else:
self.assertEqual(condargs, result)
def test_service_get_all(self):
self._test_stubbed('service_get_all', (),
dict(host=None, topic=None, binary=None))
def test_service_get_by_host_and_topic(self):
self._test_stubbed('service_get_by_host_and_topic',
('host', 'topic'),
dict(topic='topic', host='host', binary=None))
def test_service_get_all_by_topic(self):
self._test_stubbed('service_get_all_by_topic',
('topic',),
dict(topic='topic', host=None, binary=None))
def test_service_get_all_by_host(self):
self._test_stubbed('service_get_all_by_host',
('host',),
dict(host='host', topic=None, binary=None))
def test_service_get_by_compute_host(self):
self._test_stubbed('service_get_by_compute_host',
('host',),
dict(topic='compute', host='host', binary=None),
db_result_listified=True)
def test_service_get_by_host_and_binary(self):
self._test_stubbed('service_get_by_host_and_binary',
('host', 'binary'),
dict(host='host', binary='binary', topic=None))
def test_service_get_by_compute_host_not_found(self):
self._test_stubbed('service_get_by_compute_host',
('host',),
dict(topic='compute', host='host', binary=None),
db_exception=exc.ComputeHostNotFound(host='host'))
def test_service_get_by_host_and_binary_not_found(self):
self._test_stubbed('service_get_by_host_and_binary',
('host', 'binary'),
dict(host='host', binary='binary', topic=None),
db_exception=exc.HostBinaryNotFound(binary='binary',
host='host'))
def test_security_groups_trigger_handler(self):
self.mox.StubOutWithMock(self.conductor_manager.security_group_api,
'trigger_handler')
self.conductor_manager.security_group_api.trigger_handler('event',
self.context,
'args')
self.mox.ReplayAll()
self.conductor.security_groups_trigger_handler(self.context,
'event', ['args'])
def _test_object_action(self, is_classmethod, raise_exception): def _test_object_action(self, is_classmethod, raise_exception):
class TestObject(obj_base.NovaObject): class TestObject(obj_base.NovaObject):
def foo(self, raise_exception=False): def foo(self, raise_exception=False):
@@ -343,8 +122,9 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
# so use a list here to make sure we can handle it # so use a list here to make sure we can handle it
fake_args = [] fake_args = []
if is_classmethod: if is_classmethod:
result = self.conductor.object_class_action( versions = {'TestObject': '1.0'}
self.context, TestObject.obj_name(), 'bar', '1.0', result = self.conductor.object_class_action_versions(
self.context, TestObject.obj_name(), 'bar', versions,
fake_args, {'raise_exception': raise_exception}) fake_args, {'raise_exception': raise_exception})
else: else:
updates, result = self.conductor.object_action( updates, result = self.conductor.object_action(
@@ -410,399 +190,6 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
m.return_value.obj_to_primitive.assert_called_once_with( m.return_value.obj_to_primitive.assert_called_once_with(
target_version='1.2', version_manifest=versions) target_version='1.2', version_manifest=versions)
def _test_expected_exceptions(self, db_method, conductor_method, errors,
*args, **kwargs):
# Tests that expected exceptions are handled properly.
for error in errors:
with mock.patch.object(db, db_method, side_effect=error):
self.assertRaises(messaging.ExpectedException,
conductor_method,
self.context, *args, **kwargs)
def test_action_event_start_expected_exceptions(self):
error = exc.InstanceActionNotFound(request_id='1', instance_uuid='2')
self._test_expected_exceptions(
'action_event_start', self.conductor.action_event_start, [error],
{'foo': 'bar'})
def test_action_event_finish_expected_exceptions(self):
errors = (exc.InstanceActionNotFound(request_id='1',
instance_uuid='2'),
exc.InstanceActionEventNotFound(event='1', action_id='2'))
self._test_expected_exceptions(
'action_event_finish', self.conductor.action_event_finish,
errors, {'foo': 'bar'})
def test_instance_update_expected_exceptions(self):
errors = (exc.InvalidUUID(uuid='foo'),
exc.InstanceNotFound(instance_id=1),
exc.UnexpectedTaskStateError(instance_uuid='fake_uuid',
expected={'task_state': 'foo'},
actual={'task_state': 'bar'}))
self._test_expected_exceptions(
'instance_update', self.conductor.instance_update,
errors, None, {'foo': 'bar'}, None)
def test_instance_get_by_uuid_expected_exceptions(self):
error = exc.InstanceNotFound(instance_id=1)
self._test_expected_exceptions(
'instance_get_by_uuid', self.conductor.instance_get_by_uuid,
[error], None, [])
def test_aggregate_host_add_expected_exceptions(self):
error = exc.AggregateHostExists(aggregate_id=1, host='foo')
self._test_expected_exceptions(
'aggregate_host_add', self.conductor.aggregate_host_add,
[error], {'id': 1}, None)
def test_aggregate_host_delete_expected_exceptions(self):
error = exc.AggregateHostNotFound(aggregate_id=1, host='foo')
self._test_expected_exceptions(
'aggregate_host_delete', self.conductor.aggregate_host_delete,
[error], {'id': 1}, None)
def test_service_update_expected_exceptions(self):
error = exc.ServiceNotFound(service_id=1)
self._test_expected_exceptions(
'service_update',
self.conductor.service_update,
[error], {'id': 1}, None)
def test_service_destroy_expected_exceptions(self):
error = exc.ServiceNotFound(service_id=1)
self._test_expected_exceptions(
'service_destroy',
self.conductor.service_destroy,
[error], 1)
def _setup_aggregate_with_host(self):
aggregate_ref = db.aggregate_create(self.context.elevated(),
{'name': 'foo'}, metadata={'availability_zone': 'foo'})
self.conductor.aggregate_host_add(self.context, aggregate_ref, 'bar')
aggregate_ref = db.aggregate_get(self.context.elevated(),
aggregate_ref['id'])
return aggregate_ref
def test_aggregate_host_add(self):
aggregate_ref = self._setup_aggregate_with_host()
self.assertIn('bar', aggregate_ref['hosts'])
db.aggregate_delete(self.context.elevated(), aggregate_ref['id'])
def test_aggregate_host_delete(self):
aggregate_ref = self._setup_aggregate_with_host()
self.conductor.aggregate_host_delete(self.context, aggregate_ref,
'bar')
aggregate_ref = db.aggregate_get(self.context.elevated(),
aggregate_ref['id'])
self.assertNotIn('bar', aggregate_ref['hosts'])
db.aggregate_delete(self.context.elevated(), aggregate_ref['id'])
def test_network_migrate_instance_start(self):
self.mox.StubOutWithMock(self.conductor_manager.network_api,
'migrate_instance_start')
self.conductor_manager.network_api.migrate_instance_start(self.context,
'instance',
'migration')
self.mox.ReplayAll()
self.conductor.network_migrate_instance_start(self.context,
'instance',
'migration')
def test_network_migrate_instance_finish(self):
self.mox.StubOutWithMock(self.conductor_manager.network_api,
'migrate_instance_finish')
self.conductor_manager.network_api.migrate_instance_finish(
self.context, 'instance', 'migration')
self.mox.ReplayAll()
self.conductor.network_migrate_instance_finish(self.context,
'instance',
'migration')
def test_instance_destroy(self):
instance = objects.Instance(id=1, uuid='fake-uuid')
@mock.patch.object(instance, 'destroy')
@mock.patch.object(obj_base, 'obj_to_primitive',
return_value='fake-result')
def do_test(mock_to_primitive, mock_destroy):
result = self.conductor.instance_destroy(self.context, instance)
mock_destroy.assert_called_once_with()
mock_to_primitive.assert_called_once_with(instance)
self.assertEqual(result, 'fake-result')
do_test()
def test_compute_unrescue(self):
self.mox.StubOutWithMock(self.conductor_manager.compute_api,
'unrescue')
self.conductor_manager.compute_api.unrescue(self.context, 'instance')
self.mox.ReplayAll()
self.conductor.compute_unrescue(self.context, 'instance')
def test_instance_get_active_by_window_joined(self):
self.mox.StubOutWithMock(db, 'instance_get_active_by_window_joined')
db.instance_get_active_by_window_joined(self.context, 'fake-begin',
'fake-end', 'fake-proj',
'fake-host')
self.mox.ReplayAll()
self.conductor.instance_get_active_by_window_joined(
self.context, 'fake-begin', 'fake-end', 'fake-proj', 'fake-host')
def test_instance_fault_create(self):
self.mox.StubOutWithMock(db, 'instance_fault_create')
db.instance_fault_create(self.context, 'fake-values').AndReturn(
'fake-result')
self.mox.ReplayAll()
result = self.conductor.instance_fault_create(self.context,
'fake-values')
self.assertEqual(result, 'fake-result')
def test_action_event_start(self):
self.mox.StubOutWithMock(db, 'action_event_start')
db.action_event_start(self.context, mox.IgnoreArg())
self.mox.ReplayAll()
self.conductor.action_event_start(self.context, {})
def test_action_event_finish(self):
self.mox.StubOutWithMock(db, 'action_event_finish')
db.action_event_finish(self.context, mox.IgnoreArg())
self.mox.ReplayAll()
self.conductor.action_event_finish(self.context, {})
def test_agent_build_get_by_triple(self):
self.mox.StubOutWithMock(db, 'agent_build_get_by_triple')
db.agent_build_get_by_triple(self.context, 'fake-hv', 'fake-os',
'fake-arch').AndReturn('it worked')
self.mox.ReplayAll()
result = self.conductor.agent_build_get_by_triple(self.context,
'fake-hv',
'fake-os',
'fake-arch')
self.assertEqual(result, 'it worked')
def test_bw_usage_update(self):
self.mox.StubOutWithMock(db, 'bw_usage_update')
self.mox.StubOutWithMock(db, 'bw_usage_get')
update_args = (self.context, 'uuid', 'mac', 0, 10, 20, 5, 10, 20)
get_args = (self.context, 'uuid', 0, 'mac')
db.bw_usage_update(*update_args, update_cells=True)
db.bw_usage_get(*get_args).AndReturn('foo')
self.mox.ReplayAll()
result = self.conductor.bw_usage_update(*update_args,
update_cells=True)
self.assertEqual(result, 'foo')
@mock.patch.object(notifications, 'audit_period_bounds')
@mock.patch.object(notifications, 'bandwidth_usage')
@mock.patch.object(compute_utils, 'notify_about_instance_usage')
def test_notify_usage_exists(self, mock_notify, mock_bw, mock_audit):
info = {
'audit_period_beginning': 'start',
'audit_period_ending': 'end',
'bandwidth': 'bw_usage',
'image_meta': {},
'extra': 'info',
}
instance = objects.Instance(id=1, system_metadata={})
mock_audit.return_value = ('start', 'end')
mock_bw.return_value = 'bw_usage'
self.conductor.notify_usage_exists(self.context, instance, False, True,
system_metadata={},
extra_usage_info=dict(extra='info'))
class MatchInstance(object):
def __eq__(self, thing):
return thing.id == instance.id
notifier = self.conductor_manager.notifier
mock_audit.assert_called_once_with(False)
mock_bw.assert_called_once_with(MatchInstance(), 'start', True)
mock_notify.assert_called_once_with(notifier, self.context,
MatchInstance(),
'exists', system_metadata={},
extra_usage_info=info)
def test_get_ec2_ids(self):
expected = {
'instance-id': 'ec2-inst-id',
'ami-id': 'ec2-ami-id',
'kernel-id': 'ami-kernel-ec2-kernelid',
'ramdisk-id': 'ami-ramdisk-ec2-ramdiskid',
}
inst = {
'uuid': 'fake-uuid',
'kernel_id': 'ec2-kernelid',
'ramdisk_id': 'ec2-ramdiskid',
'image_ref': 'fake-image',
}
self.mox.StubOutWithMock(ec2utils, 'id_to_ec2_inst_id')
self.mox.StubOutWithMock(ec2utils, 'glance_id_to_ec2_id')
self.mox.StubOutWithMock(ec2utils, 'image_type')
ec2utils.id_to_ec2_inst_id(inst['uuid']).AndReturn(
expected['instance-id'])
ec2utils.glance_id_to_ec2_id(self.context,
inst['image_ref']).AndReturn(
expected['ami-id'])
for image_type in ['kernel', 'ramdisk']:
image_id = inst['%s_id' % image_type]
ec2utils.image_type(image_type).AndReturn('ami-' + image_type)
ec2utils.glance_id_to_ec2_id(self.context, image_id,
'ami-' + image_type).AndReturn(
'ami-%s-ec2-%sid' % (image_type, image_type))
self.mox.ReplayAll()
result = self.conductor.get_ec2_ids(self.context, inst)
self.assertEqual(result, expected)
def test_migration_get_in_progress_by_host_and_node(self):
self.mox.StubOutWithMock(db,
'migration_get_in_progress_by_host_and_node')
db.migration_get_in_progress_by_host_and_node(
self.context, 'fake-host', 'fake-node').AndReturn('fake-result')
self.mox.ReplayAll()
result = self.conductor.migration_get_in_progress_by_host_and_node(
self.context, 'fake-host', 'fake-node')
self.assertEqual(result, 'fake-result')
def test_aggregate_metadata_get_by_host(self):
self.mox.StubOutWithMock(db, 'aggregate_metadata_get_by_host')
db.aggregate_metadata_get_by_host(self.context, 'host',
'key').AndReturn('result')
self.mox.ReplayAll()
result = self.conductor.aggregate_metadata_get_by_host(self.context,
'host', 'key')
self.assertEqual(result, 'result')
def test_block_device_mapping_get_all_by_instance(self):
fake_inst = {'uuid': 'fake-uuid'}
self.mox.StubOutWithMock(db,
'block_device_mapping_get_all_by_instance')
db.block_device_mapping_get_all_by_instance(
self.context, fake_inst['uuid']).AndReturn('fake-result')
self.mox.ReplayAll()
result = self.conductor.block_device_mapping_get_all_by_instance(
self.context, fake_inst, legacy=False)
self.assertEqual(result, 'fake-result')
def test_compute_node_update(self):
node = {'id': 'fake-id'}
self.mox.StubOutWithMock(db, 'compute_node_update')
db.compute_node_update(self.context, node['id'], {'fake': 'values'}).\
AndReturn('fake-result')
self.mox.ReplayAll()
result = self.conductor.compute_node_update(self.context, node,
{'fake': 'values'})
self.assertEqual(result, 'fake-result')
def test_compute_node_delete(self):
node = {'id': 'fake-id'}
self.mox.StubOutWithMock(db, 'compute_node_delete')
db.compute_node_delete(self.context, node['id']).AndReturn(None)
self.mox.ReplayAll()
result = self.conductor.compute_node_delete(self.context, node)
self.assertIsNone(result)
def test_task_log_get(self):
self.mox.StubOutWithMock(db, 'task_log_get')
db.task_log_get(self.context, 'task', 'begin', 'end', 'host',
'state').AndReturn('result')
self.mox.ReplayAll()
result = self.conductor.task_log_get(self.context, 'task', 'begin',
'end', 'host', 'state')
self.assertEqual(result, 'result')
def test_task_log_get_with_no_state(self):
self.mox.StubOutWithMock(db, 'task_log_get')
db.task_log_get(self.context, 'task', 'begin', 'end',
'host', None).AndReturn('result')
self.mox.ReplayAll()
result = self.conductor.task_log_get(self.context, 'task', 'begin',
'end', 'host', None)
self.assertEqual(result, 'result')
def test_task_log_begin_task(self):
self.mox.StubOutWithMock(db, 'task_log_begin_task')
db.task_log_begin_task(self.context.elevated(), 'task', 'begin',
'end', 'host', 'items',
'message').AndReturn('result')
self.mox.ReplayAll()
result = self.conductor.task_log_begin_task(
self.context, 'task', 'begin', 'end', 'host', 'items', 'message')
self.assertEqual(result, 'result')
def test_task_log_end_task(self):
self.mox.StubOutWithMock(db, 'task_log_end_task')
db.task_log_end_task(self.context.elevated(), 'task', 'begin', 'end',
'host', 'errors', 'message').AndReturn('result')
self.mox.ReplayAll()
result = self.conductor.task_log_end_task(
self.context, 'task', 'begin', 'end', 'host', 'errors', 'message')
self.assertEqual(result, 'result')
def test_security_groups_trigger_members_refresh(self):
self.mox.StubOutWithMock(self.conductor_manager.security_group_api,
'trigger_members_refresh')
self.conductor_manager.security_group_api.trigger_members_refresh(
self.context, [1, 2, 3])
self.mox.ReplayAll()
self.conductor.security_groups_trigger_members_refresh(self.context,
[1, 2, 3])
def test_vol_usage_update(self):
self.mox.StubOutWithMock(db, 'vol_usage_update')
self.mox.StubOutWithMock(compute_utils, 'usage_volume_info')
fake_inst = {'uuid': 'fake-uuid',
'project_id': 'fake-project',
'user_id': 'fake-user',
'availability_zone': 'fake-az',
}
db.vol_usage_update(self.context, 'fake-vol', 22, 33, 44, 55,
fake_inst['uuid'],
fake_inst['project_id'],
fake_inst['user_id'],
fake_inst['availability_zone'],
False).AndReturn(test_volume_usage.fake_vol_usage)
compute_utils.usage_volume_info(
mox.IsA(objects.VolumeUsage)).AndReturn('fake-info')
self.mox.ReplayAll()
self.conductor.vol_usage_update(self.context, 'fake-vol',
22, 33, 44, 55, fake_inst, None, False)
self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[0]
self.assertEqual('conductor.%s' % self.conductor_manager.host,
msg.publisher_id)
self.assertEqual('volume.usage', msg.event_type)
self.assertEqual('INFO', msg.priority)
self.assertEqual('fake-info', msg.payload)
def test_compute_node_create(self):
self.mox.StubOutWithMock(db, 'compute_node_create')
db.compute_node_create(self.context, 'fake-values').AndReturn(
'fake-result')
self.mox.ReplayAll()
result = self.conductor.compute_node_create(self.context,
'fake-values')
self.assertEqual(result, 'fake-result')
class ConductorRPCAPITestCase(_BaseTestCase, test.TestCase): class ConductorRPCAPITestCase(_BaseTestCase, test.TestCase):
"""Conductor RPC API Tests.""" """Conductor RPC API Tests."""
@@ -890,49 +277,6 @@ class ConductorImportTest(test.TestCase):
conductor_api.LocalComputeTaskAPI) conductor_api.LocalComputeTaskAPI)
class ConductorPolicyTest(test.TestCase):
def test_all_allowed_keys(self):
ctxt = context.RequestContext('fake-user', 'fake-project')
conductor = conductor_manager.ConductorManager()
updates = {}
for key in conductor_manager.allowed_updates:
if key in conductor_manager.datetime_fields:
updates[key] = timeutils.utcnow()
elif key == 'access_ip_v4':
updates[key] = '10.0.0.2'
elif key == 'access_ip_v6':
updates[key] = '2001:db8:0:1::1'
elif key in ('instance_type_id', 'memory_mb', 'ephemeral_gb',
'root_gb', 'vcpus', 'power_state', 'progress'):
updates[key] = 5
elif key == 'system_metadata':
updates[key] = {'foo': 'foo'}
else:
updates[key] = 'foo'
def fake_save(inst):
# id that comes back from db after updating
inst.id = 1
with mock.patch.object(objects.Instance, 'save',
side_effect=fake_save,
autospec=True) as mock_save:
conductor.instance_update(ctxt, 'fake-instance', updates,
'conductor')
mock_save.assert_called_once_with(mock.ANY)
def test_allowed_keys_are_real(self):
instance = models.Instance()
keys = list(conductor_manager.allowed_updates)
# NOTE(danms): expected_task_state is a parameter that gets
# passed to the db layer, but is not actually an instance attribute
del keys[keys.index('expected_task_state')]
for key in keys:
self.assertTrue(hasattr(instance, key))
class _BaseTaskTestCase(object): class _BaseTaskTestCase(object):
def setUp(self): def setUp(self):
super(_BaseTaskTestCase, self).setUp() super(_BaseTaskTestCase, self).setUp()
@@ -2105,24 +1449,3 @@ class ConductorLocalComputeTaskAPITestCase(ConductorTaskAPITestCase):
super(ConductorLocalComputeTaskAPITestCase, self).setUp() super(ConductorLocalComputeTaskAPITestCase, self).setUp()
self.conductor = conductor_api.LocalComputeTaskAPI() self.conductor = conductor_api.LocalComputeTaskAPI()
self.conductor_manager = self.conductor._manager._target self.conductor_manager = self.conductor._manager._target
class ConductorV3ManagerProxyTestCase(test.NoDBTestCase):
def test_v3_manager_proxy(self):
manager = conductor_manager.ConductorManager()
proxy = conductor_manager._ConductorManagerV3Proxy(manager)
ctxt = context.get_admin_context()
methods = [
# (method, number_of_args)
('provider_fw_rule_get_all', 0),
('object_class_action_versions', 5),
('object_action', 4),
('object_backport_versions', 2),
]
for method, num_args in methods:
args = range(num_args)
with mock.patch.object(manager, method) as mock_method:
getattr(proxy, method)(ctxt, *args)
mock_method.assert_called_once_with(ctxt, *args)