Files
deb-nova/nova/tests/unit/scheduler/test_rpcapi.py
Sylvain Bauza 40781fbae5 Modify Scheduler RPC API to use RequestSpec obj
Since we now have a RequestSpec object, we can directly provide it thru
the RPC API select_destinations(). Since the conductor also uses the
RequestSpec object, we just need to have a RPC compatibility check to
see whether we can directly send the object or backport it to dicts if
the scheduler is old.

Note: As mox was in use for the RPC API tests, I replaced it by mock
for cleaning up the test helper method.

Implements blueprint request-spec-object-mitaka

Change-Id: Ifd3289bf9eccab7e47cd00055b716abb8e6eb965
2016-01-06 00:02:22 +01:00

125 lines
4.7 KiB
Python

# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for nova.scheduler.rpcapi
"""
import mock
from oslo_config import cfg
from nova import context
from nova import objects
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import test
CONF = cfg.CONF
class SchedulerRpcAPITestCase(test.NoDBTestCase):
def _test_scheduler_api(self, method, rpc_method, expected_args=None,
**kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = scheduler_rpcapi.SchedulerAPI()
self.assertIsNotNone(rpcapi.client)
self.assertEqual(rpcapi.client.target.topic, CONF.scheduler_topic)
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
expected_fanout = kwargs.pop('fanout', None)
expected_kwargs = kwargs.copy()
if expected_args:
expected_kwargs = expected_args
prepare_kwargs = {}
if expected_fanout:
prepare_kwargs['fanout'] = True
if expected_version:
prepare_kwargs['version'] = expected_version
# NOTE(sbauza): We need to persist the method before mocking it
orig_prepare = rpcapi.client.prepare
def fake_can_send_version(version=None):
return orig_prepare(version=version).can_send_version()
@mock.patch.object(rpcapi.client, rpc_method,
return_value=expected_retval)
@mock.patch.object(rpcapi.client, 'prepare',
return_value=rpcapi.client)
@mock.patch.object(rpcapi.client, 'can_send_version',
side_effect=fake_can_send_version)
def do_test(mock_csv, mock_prepare, mock_rpc_method):
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
mock_prepare.assert_called_once_with(**prepare_kwargs)
mock_rpc_method.assert_called_once_with(ctxt, method,
**expected_kwargs)
do_test()
def test_select_destinations(self):
fake_spec = objects.RequestSpec()
self._test_scheduler_api('select_destinations', rpc_method='call',
spec_obj=fake_spec,
version='4.3')
@mock.patch.object(objects.RequestSpec, 'to_legacy_filter_properties_dict')
@mock.patch.object(objects.RequestSpec, 'to_legacy_request_spec_dict')
def test_select_destinations_with_old_manager(self, to_spec, to_props):
self.flags(scheduler='4.0', group='upgrade_levels')
to_spec.return_value = 'fake_request_spec'
to_props.return_value = 'fake_prop'
fake_spec = objects.RequestSpec()
self._test_scheduler_api('select_destinations', rpc_method='call',
expected_args={'request_spec': 'fake_request_spec',
'filter_properties': 'fake_prop'},
spec_obj=fake_spec,
version='4.0')
def test_update_aggregates(self):
self._test_scheduler_api('update_aggregates', rpc_method='cast',
aggregates='aggregates',
version='4.1',
fanout=True)
def test_delete_aggregate(self):
self._test_scheduler_api('delete_aggregate', rpc_method='cast',
aggregate='aggregate',
version='4.1',
fanout=True)
def test_update_instance_info(self):
self._test_scheduler_api('update_instance_info', rpc_method='cast',
host_name='fake_host',
instance_info='fake_instance',
fanout=True,
version='4.2')
def test_delete_instance_info(self):
self._test_scheduler_api('delete_instance_info', rpc_method='cast',
host_name='fake_host',
instance_uuid='fake_uuid',
fanout=True,
version='4.2')
def test_sync_instance_info(self):
self._test_scheduler_api('sync_instance_info', rpc_method='cast',
host_name='fake_host',
instance_uuids=['fake1', 'fake2'],
fanout=True,
version='4.2')