Modify Scheduler RPC API to use RequestSpec obj

Since we now have a RequestSpec object, we can directly provide it thru
the RPC API select_destinations(). Since the conductor also uses the
RequestSpec object, we just need to have a RPC compatibility check to
see whether we can directly send the object or backport it to dicts if
the scheduler is old.

Note: As mox was in use for the RPC API tests, I replaced it by mock
for cleaning up the test helper method.

Implements blueprint request-spec-object-mitaka

Change-Id: Ifd3289bf9eccab7e47cd00055b716abb8e6eb965
This commit is contained in:
Sylvain Bauza 2015-07-07 19:33:49 +02:00
parent 21f6f7b63a
commit 40781fbae5
12 changed files with 140 additions and 125 deletions

View File

@ -25,7 +25,6 @@ from oslo_config import cfg
from nova import exception
from nova.i18n import _
from nova import objects
from nova.scheduler import driver
CONF = cfg.CONF
@ -58,14 +57,8 @@ class ChanceScheduler(driver.Scheduler):
return random.choice(hosts)
def select_destinations(self, context, request_spec, filter_properties):
def select_destinations(self, context, spec_obj):
"""Selects random destinations."""
# TODO(sbauza): Change the select_destinations method to accept a
# RequestSpec object directly (and add a new RPC API method for passing
# a RequestSpec object over the wire)
spec_obj = objects.RequestSpec.from_primitives(context,
request_spec,
filter_properties)
num_instances = spec_obj.num_instances
# NOTE(timello): Returns a list of dicts with 'host', 'nodename' and
# 'limits' as keys for compatibility with filter_scheduler.

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
from nova.scheduler import rpcapi as scheduler_rpcapi
@ -31,20 +29,7 @@ class SchedulerQueryClient(object):
The result should be a list of dicts with 'host', 'nodename' and
'limits' as keys.
"""
# TODO(sbauza): Provide directly the RequestSpec object as an arg
# once the RPC API is modified for that.
request_spec = spec_obj.to_legacy_request_spec_dict()
filter_properties = spec_obj.to_legacy_filter_properties_dict()
# FIXME(sbauza): Serialize/Unserialize the legacy dict because of
# oslo.messaging #1529084 to transform datetime values into strings.
# tl;dr: datetimes in dicts are not accepted as correct values by the
# rpc fake driver.
# will be removed in the next patch of that series.
# Yeah, that's an ugly hack I know, but that's only for not squashing
# both commits.
request_spec = jsonutils.loads(jsonutils.dumps(request_spec))
return self.scheduler_rpcapi.select_destinations(context, request_spec,
filter_properties)
return self.scheduler_rpcapi.select_destinations(context, spec_obj)
def update_aggregates(self, context, aggregates):
"""Updates HostManager internal aggregates information.

View File

@ -53,7 +53,7 @@ class Scheduler(object):
if self.servicegroup_api.service_is_up(service)]
@abc.abstractmethod
def select_destinations(self, context, request_spec, filter_properties):
def select_destinations(self, context, spec_obj):
"""Must override select_destinations method.
:return: A list of dicts with 'host', 'nodename' and 'limits' as keys

View File

@ -27,7 +27,6 @@ from six.moves import range
import nova.conf
from nova import exception
from nova.i18n import _
from nova import objects
from nova import rpc
from nova.scheduler import driver
from nova.scheduler import scheduler_options
@ -44,14 +43,8 @@ class FilterScheduler(driver.Scheduler):
self.options = scheduler_options.SchedulerOptions()
self.notifier = rpc.get_notifier('scheduler')
def select_destinations(self, context, request_spec, filter_properties):
def select_destinations(self, context, spec_obj):
"""Selects a filtered set of hosts and nodes."""
# TODO(sbauza): Change the select_destinations method to accept a
# RequestSpec object directly (and add a new RPC API method for passing
# a RequestSpec object over the wire)
spec_obj = objects.RequestSpec.from_primitives(context,
request_spec,
filter_properties)
self.notifier.info(
context, 'scheduler.select_destinations.start',
dict(request_spec=spec_obj.to_legacy_request_spec_dict()))

View File

@ -27,6 +27,7 @@ from oslo_utils import importutils
import nova.conf
from nova import exception
from nova import manager
from nova import objects
from nova import quota
@ -38,7 +39,9 @@ QUOTAS = quota.QUOTAS
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
target = messaging.Target(version='4.2')
target = messaging.Target(version='4.3')
_sentinel = object()
def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
@ -57,15 +60,22 @@ class SchedulerManager(manager.Manager):
self.driver.run_periodic_tasks(context)
@messaging.expected_exceptions(exception.NoValidHost)
def select_destinations(self, context, request_spec, filter_properties):
"""Returns destinations(s) best suited for this request_spec and
filter_properties.
def select_destinations(self, ctxt,
request_spec=None, filter_properties=None,
spec_obj=_sentinel):
"""Returns destinations(s) best suited for this RequestSpec.
The result should be a list of dicts with 'host', 'nodename' and
'limits' as keys.
"""
dests = self.driver.select_destinations(context, request_spec,
filter_properties)
# TODO(sbauza): Change the method signature to only accept a spec_obj
# argument once API v5 is provided.
if spec_obj is self._sentinel:
spec_obj = objects.RequestSpec.from_primitives(ctxt,
request_spec,
filter_properties)
dests = self.driver.select_destinations(ctxt, spec_obj)
return jsonutils.to_primitive(dests)
def update_aggregates(self, ctxt, aggregates):

View File

@ -85,6 +85,9 @@ class SchedulerAPI(object):
done such that they can handle the version_cap being set to
4.2.
* 4.3 - Modify select_destinations() signature by providing a
RequestSpec obj
'''
VERSION_ALIASES = {
@ -105,10 +108,17 @@ class SchedulerAPI(object):
self.client = rpc.get_client(target, version_cap=version_cap,
serializer=serializer)
def select_destinations(self, ctxt, request_spec, filter_properties):
cctxt = self.client.prepare(version='4.0')
return cctxt.call(ctxt, 'select_destinations',
request_spec=request_spec, filter_properties=filter_properties)
def select_destinations(self, ctxt, spec_obj):
version = '4.3'
msg_args = {'spec_obj': spec_obj}
if not self.client.can_send_version(version):
del msg_args['spec_obj']
msg_args['request_spec'] = spec_obj.to_legacy_request_spec_dict()
msg_args['filter_properties'
] = spec_obj.to_legacy_filter_properties_dict()
version = '4.0'
cctxt = self.client.prepare(version=version)
return cctxt.call(ctxt, 'select_destinations', **msg_args)
def update_aggregates(self, ctxt, aggregates):
# NOTE(sbauza): Yes, it's a fanout, we need to update all schedulers

View File

@ -75,29 +75,29 @@ class CachingSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertEqual(mock_get_hosts.return_value, result)
def test_select_destination_raises_with_no_hosts(self):
fake_request_spec = self._get_fake_request_spec()
spec_obj = self._get_fake_request_spec()
self.driver.all_host_states = []
self.assertRaises(exception.NoValidHost,
self.driver.select_destinations,
self.context, fake_request_spec, {})
self.context, spec_obj)
@mock.patch('nova.db.instance_extra_get_by_instance_uuid',
return_value={'numa_topology': None,
'pci_requests': None})
def test_select_destination_works(self, mock_get_extra):
fake_request_spec = self._get_fake_request_spec()
spec_obj = self._get_fake_request_spec()
fake_host = self._get_fake_host_state()
self.driver.all_host_states = [fake_host]
result = self._test_select_destinations(fake_request_spec)
result = self._test_select_destinations(spec_obj)
self.assertEqual(1, len(result))
self.assertEqual(result[0]["host"], fake_host.host)
def _test_select_destinations(self, request_spec):
def _test_select_destinations(self, spec_obj):
return self.driver.select_destinations(
self.context, request_spec, {})
self.context, spec_obj)
def _get_fake_request_spec(self):
# NOTE(sbauza): Prevent to stub the Flavor.get_by_id call just by
@ -113,17 +113,22 @@ class CachingSchedulerTestCase(test_scheduler.SchedulerTestCase):
instance_properties = {
"os_type": "linux",
"project_id": "1234",
"memory_mb": 512,
"root_gb": 1,
"ephemeral_gb": 1,
"vcpus": 1,
"uuid": 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
}
request_spec = {
"instance_type": flavor,
"instance_properties": instance_properties,
"num_instances": 1,
}
request_spec = objects.RequestSpec(
flavor=flavor,
num_instances=1,
ignore_hosts=None,
force_hosts=None,
force_nodes=None,
retry=None,
availability_zone=None,
image=None,
instance_group=None,
pci_requests=None,
numa_topology=None,
instance_uuid='aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
**instance_properties
)
return request_spec
def _get_fake_host_state(self, index=0):
@ -152,7 +157,7 @@ class CachingSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.flags(service_down_time=240)
request_spec = self._get_fake_request_spec()
spec_obj = self._get_fake_request_spec()
host_states = []
for x in range(hosts):
host_state = self._get_fake_host_state(x)
@ -164,7 +169,7 @@ class CachingSchedulerTestCase(test_scheduler.SchedulerTestCase):
for x in range(requests):
self.driver.select_destinations(
self.context, request_spec, {})
self.context, spec_obj)
b = timeutils.utcnow()
c = b - a

View File

@ -57,7 +57,7 @@ class ChanceSchedulerTestCase(test_scheduler.SchedulerTestCase):
def test_select_destinations(self):
ctxt = context.RequestContext('fake', 'fake', False)
ctxt_elevated = 'fake-context-elevated'
request_spec = {'num_instances': 2}
spec_obj = objects.RequestSpec(num_instances=2, ignore_hosts=None)
self.mox.StubOutWithMock(ctxt, 'elevated')
self.mox.StubOutWithMock(self.driver, 'hosts_up')
@ -74,7 +74,7 @@ class ChanceSchedulerTestCase(test_scheduler.SchedulerTestCase):
random.choice(hosts_full).AndReturn('host2')
self.mox.ReplayAll()
dests = self.driver.select_destinations(ctxt, request_spec, {})
dests = self.driver.select_destinations(ctxt, spec_obj)
self.assertEqual(2, len(dests))
(host, node) = (dests[0]['host'], dests[0]['nodename'])
self.assertEqual('host3', host)
@ -94,7 +94,7 @@ class ChanceSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.stubs.Set(self.driver, '_filter_hosts', _return_no_host)
self.mox.ReplayAll()
request_spec = {'num_instances': 1}
spec_obj = objects.RequestSpec(num_instances=1)
self.assertRaises(exception.NoValidHost,
self.driver.select_destinations, self.context,
request_spec, {})
spec_obj)

View File

@ -62,20 +62,15 @@ class SchedulerQueryClientTestCase(test.NoDBTestCase):
def test_constructor(self):
self.assertIsNotNone(self.client.scheduler_rpcapi)
@mock.patch.object(objects.RequestSpec, 'to_legacy_filter_properties_dict')
@mock.patch.object(objects.RequestSpec, 'to_legacy_request_spec_dict')
@mock.patch.object(scheduler_rpcapi.SchedulerAPI, 'select_destinations')
def test_select_destinations(self, mock_select_destinations, to_spec,
to_props):
def test_select_destinations(self, mock_select_destinations):
fake_spec = objects.RequestSpec()
to_spec.return_value = 'fake_request_spec'
to_props.return_value = 'fake_props'
self.client.select_destinations(
context=self.context,
spec_obj=fake_spec
)
mock_select_destinations.assert_called_once_with(
self.context, 'fake_request_spec', 'fake_props')
self.context, fake_spec)
@mock.patch.object(scheduler_rpcapi.SchedulerAPI, 'update_aggregates')
def test_update_aggregates(self, mock_update_aggs):

View File

@ -272,20 +272,20 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.stubs.Set(weights.HostWeightHandler,
'get_weighed_objects', _fake_weigh_objects)
request_spec = {'instance_type': objects.Flavor(memory_mb=512,
root_gb=512,
ephemeral_gb=0,
vcpus=1),
'instance_properties': {'project_id': 1,
'root_gb': 512,
'memory_mb': 512,
'ephemeral_gb': 0,
'vcpus': 1,
'os_type': 'Linux',
'uuid': 'fake-uuid'},
'num_instances': 1}
spec_obj = objects.RequestSpec(
flavor=objects.Flavor(memory_mb=512,
root_gb=512,
ephemeral_gb=0,
vcpus=1),
project_id=1,
os_type='Linux',
instance_uuid='fake-uuid',
num_instances=1,
pci_requests=None,
numa_topology=None,
instance_group=None)
self.mox.ReplayAll()
dests = self.driver.select_destinations(self.context, request_spec, {})
dests = self.driver.select_destinations(self.context, spec_obj)
(host, node) = (dests[0]['host'], dests[0]['nodename'])
self.assertEqual(host, selected_hosts[0])
self.assertEqual(node, selected_nodes[0])
@ -295,29 +295,20 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
mock_schedule.return_value = [mock.Mock()]
with mock.patch.object(self.driver.notifier, 'info') as mock_info:
request_spec = {'num_instances': 1,
'instance_properties': {'project_id': '1',
'root_gb': 512,
'memory_mb': 512,
'ephemeral_gb': 0,
'vcpus': 1,
'uuid': '1',
'pci_requests': None,
'availability_zone': None,
'numa_topology': None},
'instance_type': objects.Flavor(memory_mb=512,
root_gb=512,
ephemeral_gb=0,
vcpus=1),
'image': {'properties': {}}}
expected = {'num_instances': 1,
'instance_properties': {'uuid': 'uuid1'},
'instance_type': {},
'image': {}}
spec_obj = objects.RequestSpec(num_instances=1,
instance_uuid='uuid1')
self.driver.select_destinations(self.context, request_spec, {})
self.driver.select_destinations(self.context, spec_obj)
expected = [
mock.call(self.context, 'scheduler.select_destinations.start',
dict(request_spec=request_spec)),
dict(request_spec=expected)),
mock.call(self.context, 'scheduler.select_destinations.end',
dict(request_spec=request_spec))]
dict(request_spec=expected))]
self.assertEqual(expected, mock_info.call_args_list)
def test_select_destinations_no_valid_host(self):
@ -328,7 +319,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.stubs.Set(self.driver, '_schedule', _return_no_host)
self.assertRaises(exception.NoValidHost,
self.driver.select_destinations, self.context,
{'num_instances': 1}, {})
objects.RequestSpec(num_instances=1))
def test_select_destinations_no_valid_host_not_enough(self):
# Tests that we have fewer hosts available than number of instances
@ -338,7 +329,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
return_value=consumed_hosts):
try:
self.driver.select_destinations(
self.context, {'num_instances': 3}, {})
self.context, objects.RequestSpec(num_instances=3))
self.fail('Expected NoValidHost to be raised.')
except exception.NoValidHost as e:
# Make sure that we provided a reason why NoValidHost.

View File

@ -16,10 +16,11 @@
Unit Tests for nova.scheduler.rpcapi
"""
from mox3 import mox
import mock
from oslo_config import cfg
from nova import context
from nova import objects
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import test
@ -27,7 +28,8 @@ CONF = cfg.CONF
class SchedulerRpcAPITestCase(test.NoDBTestCase):
def _test_scheduler_api(self, method, rpc_method, **kwargs):
def _test_scheduler_api(self, method, rpc_method, expected_args=None,
**kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = scheduler_rpcapi.SchedulerAPI()
@ -39,34 +41,53 @@ class SchedulerRpcAPITestCase(test.NoDBTestCase):
expected_fanout = kwargs.pop('fanout', None)
expected_kwargs = kwargs.copy()
self.mox.StubOutWithMock(rpcapi, 'client')
rpcapi.client.can_send_version(
mox.IsA(str)).MultipleTimes().AndReturn(True)
if expected_args:
expected_kwargs = expected_args
prepare_kwargs = {}
if expected_fanout:
prepare_kwargs['fanout'] = True
if expected_version:
prepare_kwargs['version'] = expected_version
rpcapi.client.prepare(**prepare_kwargs).AndReturn(rpcapi.client)
rpc_method = getattr(rpcapi.client, rpc_method)
# NOTE(sbauza): We need to persist the method before mocking it
orig_prepare = rpcapi.client.prepare
rpc_method(ctxt, method, **expected_kwargs).AndReturn(expected_retval)
def fake_can_send_version(version=None):
return orig_prepare(version=version).can_send_version()
self.mox.ReplayAll()
# NOTE(markmc): MultipleTimes() is OnceOrMore() not ZeroOrMore()
rpcapi.client.can_send_version('I fool you mox')
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
@mock.patch.object(rpcapi.client, rpc_method,
return_value=expected_retval)
@mock.patch.object(rpcapi.client, 'prepare',
return_value=rpcapi.client)
@mock.patch.object(rpcapi.client, 'can_send_version',
side_effect=fake_can_send_version)
def do_test(mock_csv, mock_prepare, mock_rpc_method):
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
mock_prepare.assert_called_once_with(**prepare_kwargs)
mock_rpc_method.assert_called_once_with(ctxt, method,
**expected_kwargs)
do_test()
def test_select_destinations(self):
fake_spec = objects.RequestSpec()
self._test_scheduler_api('select_destinations', rpc_method='call',
request_spec='fake_request_spec',
filter_properties='fake_prop',
spec_obj=fake_spec,
version='4.3')
@mock.patch.object(objects.RequestSpec, 'to_legacy_filter_properties_dict')
@mock.patch.object(objects.RequestSpec, 'to_legacy_request_spec_dict')
def test_select_destinations_with_old_manager(self, to_spec, to_props):
self.flags(scheduler='4.0', group='upgrade_levels')
to_spec.return_value = 'fake_request_spec'
to_props.return_value = 'fake_prop'
fake_spec = objects.RequestSpec()
self._test_scheduler_api('select_destinations', rpc_method='call',
expected_args={'request_spec': 'fake_request_spec',
'filter_properties': 'fake_prop'},
spec_obj=fake_spec,
version='4.0')
def test_update_aggregates(self):

View File

@ -55,10 +55,22 @@ class SchedulerManagerTestCase(test.NoDBTestCase):
self.assertIsInstance(manager.driver, self.driver_cls)
def test_select_destination(self):
fake_spec = objects.RequestSpec()
with mock.patch.object(self.manager.driver, 'select_destinations'
) as select_destinations:
self.manager.select_destinations(None, None, {})
select_destinations.assert_called_once_with(None, None, {})
self.manager.select_destinations(None, spec_obj=fake_spec)
select_destinations.assert_called_once_with(None, fake_spec)
# TODO(sbauza): Remove that test once the API v4 is removed
@mock.patch.object(objects.RequestSpec, 'from_primitives')
def test_select_destination_with_old_client(self, from_primitives):
fake_spec = objects.RequestSpec()
from_primitives.return_value = fake_spec
with mock.patch.object(self.manager.driver, 'select_destinations'
) as select_destinations:
self.manager.select_destinations(None, request_spec='fake_spec',
filter_properties='fake_props')
select_destinations.assert_called_once_with(None, fake_spec)
def test_update_aggregates(self):
with mock.patch.object(self.manager.driver.host_manager,