Merge "Allocate networks in the background"

This commit is contained in:
Jenkins 2013-06-07 05:05:18 +00:00 committed by Gerrit Code Review
commit 7a475d3cd6
4 changed files with 143 additions and 19 deletions

View File

@ -959,6 +959,9 @@ class ComputeManager(manager.SchedulerDependentManager):
except exception.InstanceNotFound: except exception.InstanceNotFound:
# the instance got deleted during the spawn # the instance got deleted during the spawn
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
try: try:
self._deallocate_network(context, instance) self._deallocate_network(context, instance)
except Exception: except Exception:
@ -966,6 +969,10 @@ class ComputeManager(manager.SchedulerDependentManager):
'for deleted instance') 'for deleted instance')
LOG.exception(msg, instance=instance) LOG.exception(msg, instance=instance)
except exception.UnexpectedTaskStateError as e: except exception.UnexpectedTaskStateError as e:
exc_info = sys.exc_info()
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
actual_task_state = e.kwargs.get('actual', None) actual_task_state = e.kwargs.get('actual', None)
if actual_task_state == 'deleting': if actual_task_state == 'deleting':
msg = _('Instance was deleted during spawn.') msg = _('Instance was deleted during spawn.')
@ -973,10 +980,13 @@ class ComputeManager(manager.SchedulerDependentManager):
raise exception.BuildAbortException( raise exception.BuildAbortException(
instance_uuid=instance['uuid'], reason=msg) instance_uuid=instance['uuid'], reason=msg)
else: else:
raise raise exc_info[0], exc_info[1], exc_info[2]
except Exception: except Exception:
exc_info = sys.exc_info() exc_info = sys.exc_info()
# try to re-schedule instance: # try to re-schedule instance:
# Make sure the async call finishes
if network_info is not None:
network_info.wait(do_raise=False)
rescheduled = self._reschedule_or_error(context, instance, rescheduled = self._reschedule_or_error(context, instance,
exc_info, requested_networks, admin_password, exc_info, requested_networks, admin_password,
injected_files_orig, is_first_time, request_spec, injected_files_orig, is_first_time, request_spec,
@ -1111,15 +1121,24 @@ class ComputeManager(manager.SchedulerDependentManager):
def _allocate_network(self, context, instance, requested_networks, macs, def _allocate_network(self, context, instance, requested_networks, macs,
security_groups): security_groups):
"""Allocate networks for an instance and return the network info.""" """Start network allocation asynchronously. Return an instance
of NetworkInfoAsyncWrapper that can be used to retrieve the
allocated networks when the operation has finished.
"""
# NOTE(comstud): Since we're allocating networks asynchronously,
# this task state has little meaning, as we won't be in this
# state for very long.
instance = self._instance_update(context, instance['uuid'], instance = self._instance_update(context, instance['uuid'],
vm_state=vm_states.BUILDING, vm_state=vm_states.BUILDING,
task_state=task_states.NETWORKING, task_state=task_states.NETWORKING,
expected_task_state=None) expected_task_state=None)
is_vpn = pipelib.is_vpn_image(instance['image_ref']) is_vpn = pipelib.is_vpn_image(instance['image_ref'])
def async_alloc():
LOG.debug(_("Allocating IP information in the background."),
instance=instance)
try: try:
# allocate and get network info nwinfo = self.network_api.allocate_for_instance(
network_info = self.network_api.allocate_for_instance(
context, instance, vpn=is_vpn, context, instance, vpn=is_vpn,
requested_networks=requested_networks, requested_networks=requested_networks,
macs=macs, macs=macs,
@ -1129,11 +1148,10 @@ class ComputeManager(manager.SchedulerDependentManager):
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.exception(_('Instance failed network setup'), LOG.exception(_('Instance failed network setup'),
instance=instance) instance=instance)
LOG.debug(_('Instance network_info: |%s|'), nwinfo,
LOG.debug(_('Instance network_info: |%s|'), network_info,
instance=instance) instance=instance)
return nwinfo
return network_info return network_model.NetworkInfoAsyncWrapper(async_alloc)
def _prep_block_device(self, context, instance, bdms): def _prep_block_device(self, context, instance, bdms):
"""Set up the block device for an instance with error logging.""" """Set up the block device for an instance with error logging."""

View File

@ -15,6 +15,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import functools
import eventlet
import netaddr import netaddr
from nova import exception from nova import exception
@ -434,3 +437,75 @@ class NetworkInfo(list):
network_info.append((network_dict, info_dict)) network_info.append((network_dict, info_dict))
return network_info return network_info
class NetworkInfoAsyncWrapper(NetworkInfo):
"""Wrapper around NetworkInfo that allows retrieving NetworkInfo
in an async manner.
This allows one to start querying for network information before
you know you will need it. If you have a long-running
operation, this allows the network model retrieval to occur in the
background. When you need the data, it will ensure the async
operation has completed.
As an example:
def allocate_net_info(arg1, arg2)
return call_quantum_to_allocate(arg1, arg2)
network_info = NetworkInfoAsyncWrapper(allocate_net_info, arg1, arg2)
[do a long running operation -- real network_info will be retrieved
in the background]
[do something with network_info]
"""
def __init__(self, async_method, *args, **kwargs):
self._gt = eventlet.spawn(async_method, *args, **kwargs)
methods = ['json', 'legacy', 'fixed_ips', 'floating_ips']
for method in methods:
fn = getattr(self, method)
wrapper = functools.partial(self._sync_wrapper, fn)
functools.update_wrapper(wrapper, fn)
setattr(self, method, wrapper)
def _sync_wrapper(self, wrapped, *args, **kwargs):
"""Synchronize the model before running a method."""
self.wait()
return wrapped(*args, **kwargs)
def __getitem__(self, *args, **kwargs):
fn = super(NetworkInfoAsyncWrapper, self).__getitem__
return self._sync_wrapper(fn, *args, **kwargs)
def __iter__(self, *args, **kwargs):
fn = super(NetworkInfoAsyncWrapper, self).__iter__
return self._sync_wrapper(fn, *args, **kwargs)
def __len__(self, *args, **kwargs):
fn = super(NetworkInfoAsyncWrapper, self).__len__
return self._sync_wrapper(fn, *args, **kwargs)
def __str__(self, *args, **kwargs):
fn = super(NetworkInfoAsyncWrapper, self).__str__
return self._sync_wrapper(fn, *args, **kwargs)
def __repr__(self, *args, **kwargs):
fn = super(NetworkInfoAsyncWrapper, self).__repr__
return self._sync_wrapper(fn, *args, **kwargs)
def wait(self, do_raise=True):
"""Wait for async call to finish."""
if self._gt is not None:
try:
# NOTE(comstud): This looks funky, but this object is
# subclassed from list. In other words, 'self' is really
# just a list with a bunch of extra methods. So this
# line just replaces the current list (which should be
# empty) with the result.
self[:] = self._gt.wait()
except Exception:
if do_raise:
raise
finally:
self._gt = None

View File

@ -406,8 +406,11 @@ def set_stub_network_methods(stubs):
def fake_networkinfo(*args, **kwargs): def fake_networkinfo(*args, **kwargs):
return network_model.NetworkInfo() return network_model.NetworkInfo()
def fake_async_networkinfo(*args, **kwargs):
return network_model.NetworkInfoAsyncWrapper(fake_networkinfo)
stubs.Set(cm, '_get_instance_nw_info', fake_networkinfo) stubs.Set(cm, '_get_instance_nw_info', fake_networkinfo)
stubs.Set(cm, '_allocate_network', fake_networkinfo) stubs.Set(cm, '_allocate_network', fake_async_networkinfo)
stubs.Set(cm, '_deallocate_network', lambda *args, **kwargs: None) stubs.Set(cm, '_deallocate_network', lambda *args, **kwargs: None)

View File

@ -332,6 +332,34 @@ class NetworkInfoTests(test.TestCase):
fake_network_cache_model.new_ip( fake_network_cache_model.new_ip(
{'address': '10.10.0.3'})] * 4) {'address': '10.10.0.3'})] * 4)
def test_create_async_model(self):
def async_wrapper():
return model.NetworkInfo(
[fake_network_cache_model.new_vif(),
fake_network_cache_model.new_vif(
{'address': 'bb:bb:bb:bb:bb:bb'})])
ninfo = model.NetworkInfoAsyncWrapper(async_wrapper)
self.assertEqual(ninfo.fixed_ips(),
[fake_network_cache_model.new_ip({'address': '10.10.0.2'}),
fake_network_cache_model.new_ip(
{'address': '10.10.0.3'})] * 4)
def test_create_async_model_exceptions(self):
def async_wrapper():
raise test.TestingException()
ninfo = model.NetworkInfoAsyncWrapper(async_wrapper)
self.assertRaises(test.TestingException, ninfo.wait)
# 2nd one doesn't raise
self.assertEqual(None, ninfo.wait())
# Test that do_raise=False works on .wait()
ninfo = model.NetworkInfoAsyncWrapper(async_wrapper)
self.assertEqual(None, ninfo.wait(do_raise=False))
# Test we also raise calling a method
ninfo = model.NetworkInfoAsyncWrapper(async_wrapper)
self.assertRaises(test.TestingException, ninfo.fixed_ips)
def test_get_floating_ips(self): def test_get_floating_ips(self):
vif = fake_network_cache_model.new_vif() vif = fake_network_cache_model.new_vif()
vif['network']['subnets'][0]['ips'][0].add_floating_ip('192.168.1.1') vif['network']['subnets'][0]['ips'][0].add_floating_ip('192.168.1.1')