In this branch we are forwarding incoming requests to child zones when the requested resource is not found in the current zone.
For example: If 'nova pause 123' is issued against Zone 1, but instance 123 does not live in Zone 1, the call will be forwarded to all child zones hoping someone can deal with it. NOTE: This currently only works with OpenStack API requests and routing checks are only being done against Compute/instance_id checks. Specifically: * servers.get/pause/unpause/diagnostics/suspend/resume/rescue/unrescue/delete * servers.create is pending for distributed scheduler * servers.get_all will get added early in Diablo. What I've been doing for testing: 1. Set up a Nova deployment in a VM (Zone0) 2. Clone the VM and set --zone_name=zone1 (and change all the IP addresses to the new address in nova.conf, glance.conf and novarc) 3. Set --enable_zone_routing=true on all zones 4. use the --connection_type=fake driver for compute to keep things easy 5. Add Zone1 as a child of Zone0 (nova zone-add) (make sure the instance id's are different in each zone) Example of calls being sent to child zones: http://paste.openstack.org/show/964/
This commit is contained in:
@@ -17,11 +17,21 @@
|
||||
Handles all requests relating to schedulers.
|
||||
"""
|
||||
|
||||
import novaclient
|
||||
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
from nova import rpc
|
||||
|
||||
from eventlet import greenpool
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_bool('enable_zone_routing',
|
||||
False,
|
||||
'When True, routing to child zones will occur.')
|
||||
|
||||
LOG = logging.getLogger('nova.scheduler.api')
|
||||
|
||||
|
||||
@@ -45,9 +55,27 @@ def get_zone_list(context):
|
||||
items = _call_scheduler('get_zone_list', context)
|
||||
for item in items:
|
||||
item['api_url'] = item['api_url'].replace('\\/', '/')
|
||||
if not items:
|
||||
items = db.zone_get_all(context)
|
||||
return items
|
||||
|
||||
|
||||
def zone_get(context, zone_id):
|
||||
return db.zone_get(context, zone_id)
|
||||
|
||||
|
||||
def zone_delete(context, zone_id):
|
||||
return db.zone_delete(context, zone_id)
|
||||
|
||||
|
||||
def zone_create(context, data):
|
||||
return db.zone_create(context, data)
|
||||
|
||||
|
||||
def zone_update(context, zone_id, data):
|
||||
return db.zone_update(context, zone_id, data)
|
||||
|
||||
|
||||
def get_zone_capabilities(context, service=None):
|
||||
"""Returns a dict of key, value capabilities for this zone,
|
||||
or for a particular class of services running in this zone."""
|
||||
@@ -62,3 +90,152 @@ def update_service_capabilities(context, service_name, host, capabilities):
|
||||
args=dict(service_name=service_name, host=host,
|
||||
capabilities=capabilities))
|
||||
return rpc.fanout_cast(context, 'scheduler', kwargs)
|
||||
|
||||
|
||||
def _wrap_method(function, self):
|
||||
"""Wrap method to supply self."""
|
||||
def _wrap(*args, **kwargs):
|
||||
return function(self, *args, **kwargs)
|
||||
return _wrap
|
||||
|
||||
|
||||
def _process(func, zone):
|
||||
"""Worker stub for green thread pool. Give the worker
|
||||
an authenticated nova client and zone info."""
|
||||
nova = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
|
||||
nova.authenticate()
|
||||
return func(nova, zone)
|
||||
|
||||
|
||||
def child_zone_helper(zone_list, func):
|
||||
"""Fire off a command to each zone in the list.
|
||||
The return is [novaclient return objects] from each child zone.
|
||||
For example, if you are calling server.pause(), the list will
|
||||
be whatever the response from server.pause() is. One entry
|
||||
per child zone called."""
|
||||
green_pool = greenpool.GreenPool()
|
||||
return [result for result in green_pool.imap(
|
||||
_wrap_method(_process, func), zone_list)]
|
||||
|
||||
|
||||
def _issue_novaclient_command(nova, zone, collection, method_name, item_id):
|
||||
"""Use novaclient to issue command to a single child zone.
|
||||
One of these will be run in parallel for each child zone."""
|
||||
manager = getattr(nova, collection)
|
||||
result = None
|
||||
try:
|
||||
try:
|
||||
result = manager.get(int(item_id))
|
||||
except ValueError, e:
|
||||
result = manager.find(name=item_id)
|
||||
except novaclient.NotFound:
|
||||
url = zone.api_url
|
||||
LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" %
|
||||
locals()))
|
||||
return None
|
||||
|
||||
if method_name.lower() not in ['get', 'find']:
|
||||
result = getattr(result, method_name)()
|
||||
return result
|
||||
|
||||
|
||||
def wrap_novaclient_function(f, collection, method_name, item_id):
|
||||
"""Appends collection, method_name and item_id to the incoming
|
||||
(nova, zone) call from child_zone_helper."""
|
||||
def inner(nova, zone):
|
||||
return f(nova, zone, collection, method_name, item_id)
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
class RedirectResult(exception.Error):
|
||||
"""Used to the HTTP API know that these results are pre-cooked
|
||||
and they can be returned to the caller directly."""
|
||||
def __init__(self, results):
|
||||
self.results = results
|
||||
super(RedirectResult, self).__init__(
|
||||
message=_("Uncaught Zone redirection exception"))
|
||||
|
||||
|
||||
class reroute_compute(object):
|
||||
"""Decorator used to indicate that the method should
|
||||
delegate the call the child zones if the db query
|
||||
can't find anything."""
|
||||
def __init__(self, method_name):
|
||||
self.method_name = method_name
|
||||
|
||||
def __call__(self, f):
|
||||
def wrapped_f(*args, **kwargs):
|
||||
collection, context, item_id = \
|
||||
self.get_collection_context_and_id(args, kwargs)
|
||||
try:
|
||||
# Call the original function ...
|
||||
return f(*args, **kwargs)
|
||||
except exception.InstanceNotFound, e:
|
||||
LOG.debug(_("Instance %(item_id)s not found "
|
||||
"locally: '%(e)s'" % locals()))
|
||||
|
||||
if not FLAGS.enable_zone_routing:
|
||||
raise
|
||||
|
||||
zones = db.zone_get_all(context)
|
||||
if not zones:
|
||||
raise
|
||||
|
||||
# Ask the children to provide an answer ...
|
||||
LOG.debug(_("Asking child zones ..."))
|
||||
result = self._call_child_zones(zones,
|
||||
wrap_novaclient_function(_issue_novaclient_command,
|
||||
collection, self.method_name, item_id))
|
||||
# Scrub the results and raise another exception
|
||||
# so the API layers can bail out gracefully ...
|
||||
raise RedirectResult(self.unmarshall_result(result))
|
||||
return wrapped_f
|
||||
|
||||
def _call_child_zones(self, zones, function):
|
||||
"""Ask the child zones to perform this operation.
|
||||
Broken out for testing."""
|
||||
return child_zone_helper(zones, function)
|
||||
|
||||
def get_collection_context_and_id(self, args, kwargs):
|
||||
"""Returns a tuple of (novaclient collection name, security
|
||||
context and resource id. Derived class should override this."""
|
||||
context = kwargs.get('context', None)
|
||||
instance_id = kwargs.get('instance_id', None)
|
||||
if len(args) > 0 and not context:
|
||||
context = args[1]
|
||||
if len(args) > 1 and not instance_id:
|
||||
instance_id = args[2]
|
||||
return ("servers", context, instance_id)
|
||||
|
||||
def unmarshall_result(self, zone_responses):
|
||||
"""Result is a list of responses from each child zone.
|
||||
Each decorator derivation is responsible to turning this
|
||||
into a format expected by the calling method. For
|
||||
example, this one is expected to return a single Server
|
||||
dict {'server':{k:v}}. Others may return a list of them, like
|
||||
{'servers':[{k,v}]}"""
|
||||
reduced_response = []
|
||||
for zone_response in zone_responses:
|
||||
if not zone_response:
|
||||
continue
|
||||
|
||||
server = zone_response.__dict__
|
||||
|
||||
for k in server.keys():
|
||||
if k[0] == '_' or k == 'manager':
|
||||
del server[k]
|
||||
|
||||
reduced_response.append(dict(server=server))
|
||||
if reduced_response:
|
||||
return reduced_response[0] # first for now.
|
||||
return {}
|
||||
|
||||
|
||||
def redirect_handler(f):
|
||||
def new_f(*args, **kwargs):
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except RedirectResult, e:
|
||||
return e.results
|
||||
return new_f
|
||||
|
||||
@@ -58,8 +58,9 @@ class ZoneState(object):
|
||||
child zone."""
|
||||
self.last_seen = datetime.now()
|
||||
self.attempt = 0
|
||||
self.name = zone_metadata["name"]
|
||||
self.capabilities = zone_metadata["capabilities"]
|
||||
self.name = zone_metadata.get("name", "n/a")
|
||||
self.capabilities = ", ".join(["%s=%s" % (k, v)
|
||||
for k, v in zone_metadata.iteritems() if k != 'name'])
|
||||
self.is_active = True
|
||||
|
||||
def to_dict(self):
|
||||
@@ -104,7 +105,7 @@ class ZoneManager(object):
|
||||
"""Keeps the zone states updated."""
|
||||
def __init__(self):
|
||||
self.last_zone_db_check = datetime.min
|
||||
self.zone_states = {}
|
||||
self.zone_states = {} # { <zone_id> : ZoneState }
|
||||
self.service_states = {} # { <service> : { <host> : { cap k : v }}}
|
||||
self.green_pool = greenpool.GreenPool()
|
||||
|
||||
|
||||
@@ -21,6 +21,9 @@ Tests For Scheduler
|
||||
|
||||
import datetime
|
||||
import mox
|
||||
import novaclient.exceptions
|
||||
import stubout
|
||||
import webob
|
||||
|
||||
from mox import IgnoreArg
|
||||
from nova import context
|
||||
@@ -32,6 +35,7 @@ from nova import test
|
||||
from nova import rpc
|
||||
from nova import utils
|
||||
from nova.auth import manager as auth_manager
|
||||
from nova.scheduler import api
|
||||
from nova.scheduler import manager
|
||||
from nova.scheduler import driver
|
||||
from nova.compute import power_state
|
||||
@@ -937,3 +941,160 @@ class SimpleDriverTestCase(test.TestCase):
|
||||
db.instance_destroy(self.context, instance_id)
|
||||
db.service_destroy(self.context, s_ref['id'])
|
||||
db.service_destroy(self.context, s_ref2['id'])
|
||||
|
||||
|
||||
class FakeZone(object):
|
||||
def __init__(self, api_url, username, password):
|
||||
self.api_url = api_url
|
||||
self.username = username
|
||||
self.password = password
|
||||
|
||||
|
||||
def zone_get_all(context):
|
||||
return [
|
||||
FakeZone('http://example.com', 'bob', 'xxx'),
|
||||
]
|
||||
|
||||
|
||||
class FakeRerouteCompute(api.reroute_compute):
|
||||
def _call_child_zones(self, zones, function):
|
||||
return []
|
||||
|
||||
def get_collection_context_and_id(self, args, kwargs):
|
||||
return ("servers", None, 1)
|
||||
|
||||
def unmarshall_result(self, zone_responses):
|
||||
return dict(magic="found me")
|
||||
|
||||
|
||||
def go_boom(self, context, instance):
|
||||
raise exception.InstanceNotFound("boom message", instance)
|
||||
|
||||
|
||||
def found_instance(self, context, instance):
|
||||
return dict(name='myserver')
|
||||
|
||||
|
||||
class FakeResource(object):
|
||||
def __init__(self, attribute_dict):
|
||||
for k, v in attribute_dict.iteritems():
|
||||
setattr(self, k, v)
|
||||
|
||||
def pause(self):
|
||||
pass
|
||||
|
||||
|
||||
class ZoneRedirectTest(test.TestCase):
|
||||
def setUp(self):
|
||||
super(ZoneRedirectTest, self).setUp()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
|
||||
self.stubs.Set(db, 'zone_get_all', zone_get_all)
|
||||
|
||||
self.enable_zone_routing = FLAGS.enable_zone_routing
|
||||
FLAGS.enable_zone_routing = True
|
||||
|
||||
def tearDown(self):
|
||||
self.stubs.UnsetAll()
|
||||
FLAGS.enable_zone_routing = self.enable_zone_routing
|
||||
super(ZoneRedirectTest, self).tearDown()
|
||||
|
||||
def test_trap_found_locally(self):
|
||||
decorator = FakeRerouteCompute("foo")
|
||||
try:
|
||||
result = decorator(found_instance)(None, None, 1)
|
||||
except api.RedirectResult, e:
|
||||
self.fail(_("Successful database hit should succeed"))
|
||||
|
||||
def test_trap_not_found_locally(self):
|
||||
decorator = FakeRerouteCompute("foo")
|
||||
try:
|
||||
result = decorator(go_boom)(None, None, 1)
|
||||
self.assertFail(_("Should have rerouted."))
|
||||
except api.RedirectResult, e:
|
||||
self.assertEquals(e.results['magic'], 'found me')
|
||||
|
||||
def test_routing_flags(self):
|
||||
FLAGS.enable_zone_routing = False
|
||||
decorator = FakeRerouteCompute("foo")
|
||||
try:
|
||||
result = decorator(go_boom)(None, None, 1)
|
||||
self.assertFail(_("Should have thrown exception."))
|
||||
except exception.InstanceNotFound, e:
|
||||
self.assertEquals(e.message, 'boom message')
|
||||
|
||||
def test_get_collection_context_and_id(self):
|
||||
decorator = api.reroute_compute("foo")
|
||||
self.assertEquals(decorator.get_collection_context_and_id(
|
||||
(None, 10, 20), {}), ("servers", 10, 20))
|
||||
self.assertEquals(decorator.get_collection_context_and_id(
|
||||
(None, 11,), dict(instance_id=21)), ("servers", 11, 21))
|
||||
self.assertEquals(decorator.get_collection_context_and_id(
|
||||
(None,), dict(context=12, instance_id=22)), ("servers", 12, 22))
|
||||
|
||||
def test_unmarshal_single_server(self):
|
||||
decorator = api.reroute_compute("foo")
|
||||
self.assertEquals(decorator.unmarshall_result([]), {})
|
||||
self.assertEquals(decorator.unmarshall_result(
|
||||
[FakeResource(dict(a=1, b=2)), ]),
|
||||
dict(server=dict(a=1, b=2)))
|
||||
self.assertEquals(decorator.unmarshall_result(
|
||||
[FakeResource(dict(a=1, _b=2)), ]),
|
||||
dict(server=dict(a=1,)))
|
||||
self.assertEquals(decorator.unmarshall_result(
|
||||
[FakeResource(dict(a=1, manager=2)), ]),
|
||||
dict(server=dict(a=1,)))
|
||||
self.assertEquals(decorator.unmarshall_result(
|
||||
[FakeResource(dict(_a=1, manager=2)), ]),
|
||||
dict(server={}))
|
||||
|
||||
|
||||
class FakeServerCollection(object):
|
||||
def get(self, instance_id):
|
||||
return FakeResource(dict(a=10, b=20))
|
||||
|
||||
def find(self, name):
|
||||
return FakeResource(dict(a=11, b=22))
|
||||
|
||||
|
||||
class FakeEmptyServerCollection(object):
|
||||
def get(self, f):
|
||||
raise novaclient.NotFound(1)
|
||||
|
||||
def find(self, name):
|
||||
raise novaclient.NotFound(2)
|
||||
|
||||
|
||||
class FakeNovaClient(object):
|
||||
def __init__(self, collection):
|
||||
self.servers = collection
|
||||
|
||||
|
||||
class DynamicNovaClientTest(test.TestCase):
|
||||
def test_issue_novaclient_command_found(self):
|
||||
zone = FakeZone('http://example.com', 'bob', 'xxx')
|
||||
self.assertEquals(api._issue_novaclient_command(
|
||||
FakeNovaClient(FakeServerCollection()),
|
||||
zone, "servers", "get", 100).a, 10)
|
||||
|
||||
self.assertEquals(api._issue_novaclient_command(
|
||||
FakeNovaClient(FakeServerCollection()),
|
||||
zone, "servers", "find", "name").b, 22)
|
||||
|
||||
self.assertEquals(api._issue_novaclient_command(
|
||||
FakeNovaClient(FakeServerCollection()),
|
||||
zone, "servers", "pause", 100), None)
|
||||
|
||||
def test_issue_novaclient_command_not_found(self):
|
||||
zone = FakeZone('http://example.com', 'bob', 'xxx')
|
||||
self.assertEquals(api._issue_novaclient_command(
|
||||
FakeNovaClient(FakeEmptyServerCollection()),
|
||||
zone, "servers", "get", 100), None)
|
||||
|
||||
self.assertEquals(api._issue_novaclient_command(
|
||||
FakeNovaClient(FakeEmptyServerCollection()),
|
||||
zone, "servers", "find", "name"), None)
|
||||
|
||||
self.assertEquals(api._issue_novaclient_command(
|
||||
FakeNovaClient(FakeEmptyServerCollection()),
|
||||
zone, "servers", "any", "name"), None)
|
||||
|
||||
Reference in New Issue
Block a user