Merge "Send events to all relevant hosts if migrating"
This commit is contained in:
commit
0412225890
@ -65,8 +65,11 @@ class ServerExternalEventsController(wsgi.Controller):
|
||||
instance = instances.get(event.instance_uuid)
|
||||
if not instance:
|
||||
try:
|
||||
# Load migration_context here in a single DB operation
|
||||
# because we need it later on
|
||||
instance = objects.Instance.get_by_uuid(
|
||||
context, event.instance_uuid)
|
||||
context, event.instance_uuid,
|
||||
expected_attrs='migration_context')
|
||||
instances[event.instance_uuid] = instance
|
||||
except exception.InstanceNotFound:
|
||||
LOG.debug('Dropping event %(name)s:%(tag)s for unknown '
|
||||
|
@ -20,6 +20,7 @@
|
||||
networking and storage of VMs, and compute hosts on which they run)."""
|
||||
|
||||
import base64
|
||||
import collections
|
||||
import copy
|
||||
import functools
|
||||
import re
|
||||
@ -3883,27 +3884,39 @@ class API(base.Base):
|
||||
# but doesn't know where they go. We need to collate lists
|
||||
# by the host the affected instance is on and dispatch them
|
||||
# according to host
|
||||
instances_by_host = {}
|
||||
events_by_host = {}
|
||||
hosts_by_instance = {}
|
||||
instances_by_host = collections.defaultdict(list)
|
||||
events_by_host = collections.defaultdict(list)
|
||||
hosts_by_instance = collections.defaultdict(list)
|
||||
for instance in instances:
|
||||
instances_on_host = instances_by_host.get(instance.host, [])
|
||||
instances_on_host.append(instance)
|
||||
instances_by_host[instance.host] = instances_on_host
|
||||
hosts_by_instance[instance.uuid] = instance.host
|
||||
for host in self._get_relevant_hosts(context, instance):
|
||||
instances_by_host[host].append(instance)
|
||||
hosts_by_instance[instance.uuid].append(host)
|
||||
|
||||
for event in events:
|
||||
host = hosts_by_instance[event.instance_uuid]
|
||||
events_on_host = events_by_host.get(host, [])
|
||||
events_on_host.append(event)
|
||||
events_by_host[host] = events_on_host
|
||||
for host in hosts_by_instance[event.instance_uuid]:
|
||||
events_by_host[host].append(event)
|
||||
|
||||
for host in instances_by_host:
|
||||
# TODO(salv-orlando): Handle exceptions raised by the rpc api layer
|
||||
# in order to ensure that a failure in processing events on a host
|
||||
# will not prevent processing events on other hosts
|
||||
self.compute_rpcapi.external_instance_event(
|
||||
context, instances_by_host[host], events_by_host[host])
|
||||
context, instances_by_host[host], events_by_host[host],
|
||||
host=host)
|
||||
|
||||
def _get_relevant_hosts(self, context, instance):
|
||||
hosts = set()
|
||||
hosts.add(instance.host)
|
||||
if instance.migration_context is not None:
|
||||
migration_id = instance.migration_context.migration_id
|
||||
migration = objects.Migration.get_by_id(context, migration_id)
|
||||
hosts.add(migration.dest_compute)
|
||||
hosts.add(migration.source_compute)
|
||||
LOG.debug('Instance %(instance)s is migrating, '
|
||||
'copying events to all relevant hosts: '
|
||||
'%(hosts)s', {'instance': instance.uuid,
|
||||
'hosts': hosts})
|
||||
return hosts
|
||||
|
||||
def get_instance_host_status(self, instance):
|
||||
if instance.host:
|
||||
|
@ -1049,10 +1049,10 @@ class ComputeAPI(object):
|
||||
volume_id=volume_id, snapshot_id=snapshot_id,
|
||||
delete_info=delete_info)
|
||||
|
||||
def external_instance_event(self, ctxt, instances, events):
|
||||
def external_instance_event(self, ctxt, instances, events, host=None):
|
||||
instance = instances[0]
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance),
|
||||
server=_compute_host(host, instance),
|
||||
version='4.0')
|
||||
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
|
||||
events=events)
|
||||
|
@ -37,7 +37,7 @@ MISSING_UUID = '00000000-0000-0000-0000-000000000005'
|
||||
|
||||
|
||||
@classmethod
|
||||
def fake_get_by_uuid(cls, context, uuid):
|
||||
def fake_get_by_uuid(cls, context, uuid, **kwargs):
|
||||
try:
|
||||
return fake_instances[uuid]
|
||||
except KeyError:
|
||||
|
@ -3187,9 +3187,12 @@ class _ComputeAPIUnitTestMixIn(object):
|
||||
|
||||
def test_external_instance_event(self):
|
||||
instances = [
|
||||
objects.Instance(uuid=uuids.instance_1, host='host1'),
|
||||
objects.Instance(uuid=uuids.instance_2, host='host1'),
|
||||
objects.Instance(uuid=uuids.instance_3, host='host2'),
|
||||
objects.Instance(uuid=uuids.instance_1, host='host1',
|
||||
migration_context=None),
|
||||
objects.Instance(uuid=uuids.instance_2, host='host1',
|
||||
migration_context=None),
|
||||
objects.Instance(uuid=uuids.instance_3, host='host2',
|
||||
migration_context=None),
|
||||
]
|
||||
events = [
|
||||
objects.InstanceExternalEvent(
|
||||
@ -3203,10 +3206,61 @@ class _ComputeAPIUnitTestMixIn(object):
|
||||
self.compute_api.external_instance_event(self.context,
|
||||
instances, events)
|
||||
method = self.compute_api.compute_rpcapi.external_instance_event
|
||||
method.assert_any_call(self.context, instances[0:2], events[0:2])
|
||||
method.assert_any_call(self.context, instances[2:], events[2:])
|
||||
method.assert_any_call(self.context, instances[0:2], events[0:2],
|
||||
host='host1')
|
||||
method.assert_any_call(self.context, instances[2:], events[2:],
|
||||
host='host2')
|
||||
self.assertEqual(2, method.call_count)
|
||||
|
||||
def test_external_instance_event_evacuating_instance(self):
|
||||
# Since we're patching the db's migration_get(), use a dict here so
|
||||
# that we can validate the id is making its way correctly to the db api
|
||||
migrations = {}
|
||||
migrations[42] = {'id': 42, 'source_compute': 'host1',
|
||||
'dest_compute': 'host2', 'source_node': None,
|
||||
'dest_node': None, 'dest_host': None,
|
||||
'old_instance_type_id': None,
|
||||
'new_instance_type_id': None,
|
||||
'instance_uuid': uuids.instance_2, 'status': None,
|
||||
'migration_type': 'evacuation', 'memory_total': None,
|
||||
'memory_processed': None, 'memory_remaining': None,
|
||||
'disk_total': None, 'disk_processed': None,
|
||||
'disk_remaining': None, 'deleted': False,
|
||||
'hidden': False, 'created_at': None,
|
||||
'updated_at': None, 'deleted_at': None}
|
||||
|
||||
def migration_get(context, id):
|
||||
return migrations[id]
|
||||
|
||||
instances = [
|
||||
objects.Instance(uuid=uuids.instance_1, host='host1',
|
||||
migration_context=None),
|
||||
objects.Instance(uuid=uuids.instance_2, host='host1',
|
||||
migration_context=objects.MigrationContext(
|
||||
migration_id=42)),
|
||||
objects.Instance(uuid=uuids.instance_3, host='host2',
|
||||
migration_context=None)
|
||||
]
|
||||
events = [
|
||||
objects.InstanceExternalEvent(
|
||||
instance_uuid=uuids.instance_1),
|
||||
objects.InstanceExternalEvent(
|
||||
instance_uuid=uuids.instance_2),
|
||||
objects.InstanceExternalEvent(
|
||||
instance_uuid=uuids.instance_3),
|
||||
]
|
||||
|
||||
with mock.patch('nova.db.sqlalchemy.api.migration_get', migration_get):
|
||||
self.compute_api.compute_rpcapi = mock.MagicMock()
|
||||
self.compute_api.external_instance_event(self.context,
|
||||
instances, events)
|
||||
method = self.compute_api.compute_rpcapi.external_instance_event
|
||||
method.assert_any_call(self.context, instances[0:2], events[0:2],
|
||||
host='host1')
|
||||
method.assert_any_call(self.context, instances[1:], events[1:],
|
||||
host='host2')
|
||||
self.assertEqual(2, method.call_count)
|
||||
|
||||
def test_volume_ops_invalid_task_state(self):
|
||||
instance = self._create_instance_obj()
|
||||
self.assertEqual(instance.vm_state, vm_states.ACTIVE)
|
||||
|
Loading…
Reference in New Issue
Block a user