Add version to scheduler rpc API.

Part of blueprint versioned-rpc-apis.

One side effect of this change was that nova.scheduler.api was removed
in favor of nova.scheduler.rpcapi.  In this case, the api was just a
direct wrapper around rpc usage.  For other APIs, I've been following
the pattern that the rpcapi module provides the rpc client wrapper, and
if any other client-side logic is needed, that's where an api module is
used.

Change-Id: Ibd0292936f9afc77aeb5d040660bfa857861eed1
This commit is contained in:
Russell Bryant
2012-05-16 16:40:05 -04:00
parent 2b85320d43
commit 6b0697d86c
7 changed files with 203 additions and 89 deletions

View File

@@ -90,6 +90,7 @@ from nova import log as logging
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova import quota from nova import quota
from nova import rpc from nova import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import utils from nova import utils
from nova import version from nova import version
from nova.volume import volume_types from nova.volume import volume_types
@@ -1021,10 +1022,9 @@ class ServiceCommands(object):
:param host: hostname. :param host: hostname.
""" """
result = rpc.call(context.get_admin_context(), rpcapi = scheduler_rpcapi.SchedulerAPI()
FLAGS.scheduler_topic, result = rpcapi.show_host_resources(context.get_admin_context(),
{"method": "show_host_resources", host=host)
"args": {"host": host}})
if not isinstance(result, dict): if not isinstance(result, dict):
print _('An unexpected error has occurred.') print _('An unexpected error has occurred.')

View File

@@ -57,7 +57,7 @@ from nova.db import base
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
from nova.rpc import dispatcher as rpc_dispatcher from nova.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import api from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import version from nova import version
@@ -202,6 +202,7 @@ class SchedulerDependentManager(Manager):
def __init__(self, host=None, db_driver=None, service_name='undefined'): def __init__(self, host=None, db_driver=None, service_name='undefined'):
self.last_capabilities = None self.last_capabilities = None
self.service_name = service_name self.service_name = service_name
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
super(SchedulerDependentManager, self).__init__(host, db_driver) super(SchedulerDependentManager, self).__init__(host, db_driver)
def update_service_capabilities(self, capabilities): def update_service_capabilities(self, capabilities):
@@ -213,5 +214,5 @@ class SchedulerDependentManager(Manager):
"""Pass data back to the scheduler at a periodic interval.""" """Pass data back to the scheduler at a periodic interval."""
if self.last_capabilities: if self.last_capabilities:
LOG.debug(_('Notifying Schedulers of capabilities ...')) LOG.debug(_('Notifying Schedulers of capabilities ...'))
api.update_service_capabilities(context, self.service_name, self.scheduler_rpcapi.update_service_capabilities(context,
self.host, self.last_capabilities) self.service_name, self.host, self.last_capabilities)

View File

@@ -1,72 +0,0 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to schedulers.
"""
from nova import flags
from nova import log as logging
from nova import rpc
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def _call_scheduler(method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def get_host_list(context):
"""Return a list of hosts associated with this zone."""
return _call_scheduler('get_host_list', context)
def get_service_capabilities(context):
"""Return aggregated capabilities for all services."""
return _call_scheduler('get_service_capabilities', context)
def update_service_capabilities(context, service_name, host, capabilities):
"""Send an update to all the scheduler services informing them
of the capabilities of this service."""
kwargs = dict(method='update_service_capabilities',
args=dict(service_name=service_name, host=host,
capabilities=capabilities))
return rpc.fanout_cast(context, 'scheduler', kwargs)
def live_migration(context, block_migration, disk_over_commit,
instance_id, dest, topic):
"""Migrate a server to a new host"""
params = {"instance_id": instance_id,
"dest": dest,
"topic": topic,
"block_migration": block_migration,
"disk_over_commit": disk_over_commit}
# NOTE(comstud): Call vs cast so we can get exceptions back, otherwise
# this call in the scheduler driver doesn't return anything.
_call_scheduler("live_migration", context=context, params=params)

View File

@@ -51,6 +51,8 @@ QUOTAS = quota.QUOTAS
class SchedulerManager(manager.Manager): class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on.""" """Chooses a host to run instances on."""
RPC_API_VERSION = '1.0'
def __init__(self, scheduler_driver=None, *args, **kwargs): def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver: if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver scheduler_driver = FLAGS.scheduler_driver
@@ -59,6 +61,9 @@ class SchedulerManager(manager.Manager):
def __getattr__(self, key): def __getattr__(self, key):
"""Converts all method calls to use the schedule method""" """Converts all method calls to use the schedule method"""
# NOTE(russellb) Because of what this is doing, we must be careful
# when changing the API of the scheduler drivers, as that changes
# the rpc API as well, and the version should be updated accordingly.
return functools.partial(self._schedule, key) return functools.partial(self._schedule, key)
def get_host_list(self, context): def get_host_list(self, context):

79
nova/scheduler/rpcapi.py Normal file
View File

@@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the scheduler manager RPC API.
"""
from nova import flags
import nova.rpc.proxy
FLAGS = flags.FLAGS
class SchedulerAPI(nova.rpc.proxy.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:
1.0 - Initial version.
'''
RPC_API_VERSION = '1.0'
def __init__(self):
super(SchedulerAPI, self).__init__(topic=FLAGS.scheduler_topic,
default_version=self.RPC_API_VERSION)
def run_instance(self, ctxt, topic, request_spec, admin_password,
injected_files, requested_networks, is_first_time,
filter_properties, call=True):
rpc_method = self.call if call else self.cast
return rpc_method(ctxt, self.make_msg('run_instance', topic=topic,
request_spec=request_spec, admin_password=admin_password,
injected_files=injected_files,
requested_networks=requested_networks,
is_first_time=is_first_time,
filter_properties=filter_properties))
def prep_resize(self, ctxt, topic, instance_uuid, instance_type_id, image,
update_db, request_spec, filter_properties):
self.cast(ctxt, self.make_msg('prep_resize', topic=topic,
instance_uuid=instance_uuid, instance_type_id=instance_type_id,
image=image, update_db=update_db, request_spec=request_spec,
filter_properties=filter_properties))
def show_host_resources(self, ctxt, host):
return self.call(ctxt, self.make_msg('show_host_resources', host=host))
def live_migration(self, ctxt, block_migration, disk_over_commit,
instance_id, dest, topic):
# NOTE(comstud): Call vs cast so we can get exceptions back, otherwise
# this call in the scheduler driver doesn't return anything.
return self.call(ctxt, self.make_msg('live_migration',
block_migration=block_migration,
disk_over_commit=disk_over_commit, instance_id=instance_id,
dest=dest, topic=topic))
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities))
def get_host_list(self, ctxt):
return self.call(ctxt, self.make_msg('get_host_list'))

View File

@@ -0,0 +1,103 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for nova.scheduler.rpcapi
"""
from nova import context
from nova import flags
from nova import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import test
FLAGS = flags.FLAGS
class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self):
super(SchedulerRpcAPITestCase, self).setUp()
def tearDown(self):
super(SchedulerRpcAPITestCase, self).tearDown()
def _test_scheduler_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = scheduler_rpcapi.SchedulerAPI()
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, FLAGS.scheduler_topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_run_instance_call(self):
self._test_scheduler_api('run_instance', rpc_method='call',
topic='fake_topic', request_spec='fake_request_spec',
admin_password='pw', injected_files='fake_injected_files',
requested_networks='fake_requested_networks',
is_first_time=True, filter_properties='fake_filter_properties')
def test_run_instance_cast(self):
self._test_scheduler_api('run_instance', rpc_method='cast',
topic='fake_topic', request_spec='fake_request_spec',
admin_password='pw', injected_files='fake_injected_files',
requested_networks='fake_requested_networks',
is_first_time=True, filter_properties='fake_filter_properties')
def test_prep_resize(self):
self._test_scheduler_api('prep_resize', rpc_method='cast',
topic='fake_topic', instance_uuid='fake_uuid',
instance_type_id='fake_type_id', image='fake_image',
update_db='fake_update_db', request_spec='fake_request_spec',
filter_properties='fake_props')
def test_show_host_resources(self):
self._test_scheduler_api('show_host_resources', rpc_method='call',
host='fake_host')
def test_live_migration(self):
self._test_scheduler_api('live_migration', rpc_method='call',
block_migration='fake_block_migration',
disk_over_commit='fake_disk_over_commit',
instance_id='fake_id', dest='fake_dest', topic='fake_topic')
def test_update_service_capabilities(self):
self._test_scheduler_api('update_service_capabilities',
rpc_method='fanout_cast', service_name='fake_name',
host='fake_host', capabilities='fake_capabilities')
def test_get_host_list(self):
self._test_scheduler_api('get_host_list', rpc_method='call')

View File

@@ -2807,15 +2807,14 @@ class ComputeAPITestCase(BaseTestCase):
self.compute.terminate_instance(context, instance['uuid']) self.compute.terminate_instance(context, instance['uuid'])
def test_resize_request_spec(self): def test_resize_request_spec(self):
def _fake_cast(context, args): def _fake_cast(context, topic, msg):
request_spec = args['args']['request_spec'] request_spec = msg['args']['request_spec']
filter_properties = args['args']['filter_properties'] filter_properties = msg['args']['filter_properties']
instance_properties = request_spec['instance_properties'] instance_properties = request_spec['instance_properties']
self.assertEqual(instance_properties['host'], 'host2') self.assertEqual(instance_properties['host'], 'host2')
self.assertIn('host2', filter_properties['ignore_hosts']) self.assertIn('host2', filter_properties['ignore_hosts'])
self.stubs.Set(self.compute_api, '_cast_scheduler_message', self.stubs.Set(rpc, 'cast', _fake_cast)
_fake_cast)
context = self.context.elevated() context = self.context.elevated()
instance = self._create_fake_instance(dict(host='host2')) instance = self._create_fake_instance(dict(host='host2'))
@@ -2827,15 +2826,14 @@ class ComputeAPITestCase(BaseTestCase):
self.compute.terminate_instance(context, instance['uuid']) self.compute.terminate_instance(context, instance['uuid'])
def test_resize_request_spec_noavoid(self): def test_resize_request_spec_noavoid(self):
def _fake_cast(context, args): def _fake_cast(context, topic, msg):
request_spec = args['args']['request_spec'] request_spec = msg['args']['request_spec']
filter_properties = args['args']['filter_properties'] filter_properties = msg['args']['filter_properties']
instance_properties = request_spec['instance_properties'] instance_properties = request_spec['instance_properties']
self.assertEqual(instance_properties['host'], 'host2') self.assertEqual(instance_properties['host'], 'host2')
self.assertNotIn('host2', filter_properties['ignore_hosts']) self.assertNotIn('host2', filter_properties['ignore_hosts'])
self.stubs.Set(self.compute_api, '_cast_scheduler_message', self.stubs.Set(rpc, 'cast', _fake_cast)
_fake_cast)
self.flags(allow_resize_to_same_host=True) self.flags(allow_resize_to_same_host=True)
context = self.context.elevated() context = self.context.elevated()