Add version to scheduler rpc API.

Part of blueprint versioned-rpc-apis.

One side effect of this change was that nova.scheduler.api was removed
in favor of nova.scheduler.rpcapi.  In this case, the api was just a
direct wrapper around rpc usage.  For other APIs, I've been following
the pattern that the rpcapi module provides the rpc client wrapper, and
if any other client-side logic is needed, that's where an api module is
used.

Change-Id: Ibd0292936f9afc77aeb5d040660bfa857861eed1
This commit is contained in:
Russell Bryant
2012-05-16 16:40:05 -04:00
parent 4e85b1db9f
commit 6a65c714e8
5 changed files with 194 additions and 86 deletions

View File

@@ -90,6 +90,7 @@ from nova import log as logging
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova import quota from nova import quota
from nova import rpc from nova import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import utils from nova import utils
from nova import version from nova import version
from nova.volume import volume_types from nova.volume import volume_types
@@ -1021,10 +1022,9 @@ class ServiceCommands(object):
:param host: hostname. :param host: hostname.
""" """
result = rpc.call(context.get_admin_context(), rpcapi = scheduler_rpcapi.SchedulerAPI()
FLAGS.scheduler_topic, result = rpcapi.show_host_resources(context.get_admin_context(),
{"method": "show_host_resources", host=host)
"args": {"host": host}})
if not isinstance(result, dict): if not isinstance(result, dict):
print _('An unexpected error has occurred.') print _('An unexpected error has occurred.')

View File

@@ -1,72 +0,0 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to schedulers.
"""
from nova import flags
from nova import log as logging
from nova import rpc
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def _call_scheduler(method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def get_host_list(context):
"""Return a list of hosts associated with this zone."""
return _call_scheduler('get_host_list', context)
def get_service_capabilities(context):
"""Return aggregated capabilities for all services."""
return _call_scheduler('get_service_capabilities', context)
def update_service_capabilities(context, service_name, host, capabilities):
"""Send an update to all the scheduler services informing them
of the capabilities of this service."""
kwargs = dict(method='update_service_capabilities',
args=dict(service_name=service_name, host=host,
capabilities=capabilities))
return rpc.fanout_cast(context, 'scheduler', kwargs)
def live_migration(context, block_migration, disk_over_commit,
instance_id, dest, topic):
"""Migrate a server to a new host"""
params = {"instance_id": instance_id,
"dest": dest,
"topic": topic,
"block_migration": block_migration,
"disk_over_commit": disk_over_commit}
# NOTE(comstud): Call vs cast so we can get exceptions back, otherwise
# this call in the scheduler driver doesn't return anything.
_call_scheduler("live_migration", context=context, params=params)

79
nova/scheduler/rpcapi.py Normal file
View File

@@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the scheduler manager RPC API.
"""
from nova import flags
import nova.rpc.proxy
FLAGS = flags.FLAGS
class SchedulerAPI(nova.rpc.proxy.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:
1.0 - Initial version.
'''
RPC_API_VERSION = '1.0'
def __init__(self):
super(SchedulerAPI, self).__init__(topic=FLAGS.scheduler_topic,
default_version=self.RPC_API_VERSION)
def run_instance(self, ctxt, topic, request_spec, admin_password,
injected_files, requested_networks, is_first_time,
filter_properties, call=True):
rpc_method = self.call if call else self.cast
return rpc_method(ctxt, self.make_msg('run_instance', topic=topic,
request_spec=request_spec, admin_password=admin_password,
injected_files=injected_files,
requested_networks=requested_networks,
is_first_time=is_first_time,
filter_properties=filter_properties))
def prep_resize(self, ctxt, topic, instance_uuid, instance_type_id, image,
update_db, request_spec, filter_properties):
self.cast(ctxt, self.make_msg('prep_resize', topic=topic,
instance_uuid=instance_uuid, instance_type_id=instance_type_id,
image=image, update_db=update_db, request_spec=request_spec,
filter_properties=filter_properties))
def show_host_resources(self, ctxt, host):
return self.call(ctxt, self.make_msg('show_host_resources', host=host))
def live_migration(self, ctxt, block_migration, disk_over_commit,
instance_id, dest, topic):
# NOTE(comstud): Call vs cast so we can get exceptions back, otherwise
# this call in the scheduler driver doesn't return anything.
return self.call(ctxt, self.make_msg('live_migration',
block_migration=block_migration,
disk_over_commit=disk_over_commit, instance_id=instance_id,
dest=dest, topic=topic))
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities))
def get_host_list(self, ctxt):
return self.call(ctxt, self.make_msg('get_host_list'))

View File

@@ -0,0 +1,103 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for nova.scheduler.rpcapi
"""
from nova import context
from nova import flags
from nova import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import test
FLAGS = flags.FLAGS
class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self):
super(SchedulerRpcAPITestCase, self).setUp()
def tearDown(self):
super(SchedulerRpcAPITestCase, self).tearDown()
def _test_scheduler_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = scheduler_rpcapi.SchedulerAPI()
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, FLAGS.scheduler_topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_run_instance_call(self):
self._test_scheduler_api('run_instance', rpc_method='call',
topic='fake_topic', request_spec='fake_request_spec',
admin_password='pw', injected_files='fake_injected_files',
requested_networks='fake_requested_networks',
is_first_time=True, filter_properties='fake_filter_properties')
def test_run_instance_cast(self):
self._test_scheduler_api('run_instance', rpc_method='cast',
topic='fake_topic', request_spec='fake_request_spec',
admin_password='pw', injected_files='fake_injected_files',
requested_networks='fake_requested_networks',
is_first_time=True, filter_properties='fake_filter_properties')
def test_prep_resize(self):
self._test_scheduler_api('prep_resize', rpc_method='cast',
topic='fake_topic', instance_uuid='fake_uuid',
instance_type_id='fake_type_id', image='fake_image',
update_db='fake_update_db', request_spec='fake_request_spec',
filter_properties='fake_props')
def test_show_host_resources(self):
self._test_scheduler_api('show_host_resources', rpc_method='call',
host='fake_host')
def test_live_migration(self):
self._test_scheduler_api('live_migration', rpc_method='call',
block_migration='fake_block_migration',
disk_over_commit='fake_disk_over_commit',
instance_id='fake_id', dest='fake_dest', topic='fake_topic')
def test_update_service_capabilities(self):
self._test_scheduler_api('update_service_capabilities',
rpc_method='fanout_cast', service_name='fake_name',
host='fake_host', capabilities='fake_capabilities')
def test_get_host_list(self):
self._test_scheduler_api('get_host_list', rpc_method='call')

View File

@@ -2807,15 +2807,14 @@ class ComputeAPITestCase(BaseTestCase):
self.compute.terminate_instance(context, instance['uuid']) self.compute.terminate_instance(context, instance['uuid'])
def test_resize_request_spec(self): def test_resize_request_spec(self):
def _fake_cast(context, args): def _fake_cast(context, topic, msg):
request_spec = args['args']['request_spec'] request_spec = msg['args']['request_spec']
filter_properties = args['args']['filter_properties'] filter_properties = msg['args']['filter_properties']
instance_properties = request_spec['instance_properties'] instance_properties = request_spec['instance_properties']
self.assertEqual(instance_properties['host'], 'host2') self.assertEqual(instance_properties['host'], 'host2')
self.assertIn('host2', filter_properties['ignore_hosts']) self.assertIn('host2', filter_properties['ignore_hosts'])
self.stubs.Set(self.compute_api, '_cast_scheduler_message', self.stubs.Set(rpc, 'cast', _fake_cast)
_fake_cast)
context = self.context.elevated() context = self.context.elevated()
instance = self._create_fake_instance(dict(host='host2')) instance = self._create_fake_instance(dict(host='host2'))
@@ -2827,15 +2826,14 @@ class ComputeAPITestCase(BaseTestCase):
self.compute.terminate_instance(context, instance['uuid']) self.compute.terminate_instance(context, instance['uuid'])
def test_resize_request_spec_noavoid(self): def test_resize_request_spec_noavoid(self):
def _fake_cast(context, args): def _fake_cast(context, topic, msg):
request_spec = args['args']['request_spec'] request_spec = msg['args']['request_spec']
filter_properties = args['args']['filter_properties'] filter_properties = msg['args']['filter_properties']
instance_properties = request_spec['instance_properties'] instance_properties = request_spec['instance_properties']
self.assertEqual(instance_properties['host'], 'host2') self.assertEqual(instance_properties['host'], 'host2')
self.assertNotIn('host2', filter_properties['ignore_hosts']) self.assertNotIn('host2', filter_properties['ignore_hosts'])
self.stubs.Set(self.compute_api, '_cast_scheduler_message', self.stubs.Set(rpc, 'cast', _fake_cast)
_fake_cast)
self.flags(allow_resize_to_same_host=True) self.flags(allow_resize_to_same_host=True)
context = self.context.elevated() context = self.context.elevated()