Send events to all relevant hosts if migrating
Previously, external events were sent to the instance object's host field. This patch fixes the external event dispatching to check for migration. If an instance is being migrated, the source and destination compute are added to the set of hosts to which the event is sent. Change-Id: If00736ab36df4a5a3be4f02b0a550e4bcae77b1b Closes-bug: 1535918 Closes-bug: 1624052
This commit is contained in:
parent
7067ca71fb
commit
a5b920a197
@ -65,8 +65,11 @@ class ServerExternalEventsController(wsgi.Controller):
|
||||
instance = instances.get(event.instance_uuid)
|
||||
if not instance:
|
||||
try:
|
||||
# Load migration_context here in a single DB operation
|
||||
# because we need it later on
|
||||
instance = objects.Instance.get_by_uuid(
|
||||
context, event.instance_uuid)
|
||||
context, event.instance_uuid,
|
||||
expected_attrs='migration_context')
|
||||
instances[event.instance_uuid] = instance
|
||||
except exception.InstanceNotFound:
|
||||
LOG.debug('Dropping event %(name)s:%(tag)s for unknown '
|
||||
|
@ -20,6 +20,7 @@
|
||||
networking and storage of VMs, and compute hosts on which they run)."""
|
||||
|
||||
import base64
|
||||
import collections
|
||||
import copy
|
||||
import functools
|
||||
import re
|
||||
@ -3883,27 +3884,39 @@ class API(base.Base):
|
||||
# but doesn't know where they go. We need to collate lists
|
||||
# by the host the affected instance is on and dispatch them
|
||||
# according to host
|
||||
instances_by_host = {}
|
||||
events_by_host = {}
|
||||
hosts_by_instance = {}
|
||||
instances_by_host = collections.defaultdict(list)
|
||||
events_by_host = collections.defaultdict(list)
|
||||
hosts_by_instance = collections.defaultdict(list)
|
||||
for instance in instances:
|
||||
instances_on_host = instances_by_host.get(instance.host, [])
|
||||
instances_on_host.append(instance)
|
||||
instances_by_host[instance.host] = instances_on_host
|
||||
hosts_by_instance[instance.uuid] = instance.host
|
||||
for host in self._get_relevant_hosts(context, instance):
|
||||
instances_by_host[host].append(instance)
|
||||
hosts_by_instance[instance.uuid].append(host)
|
||||
|
||||
for event in events:
|
||||
host = hosts_by_instance[event.instance_uuid]
|
||||
events_on_host = events_by_host.get(host, [])
|
||||
events_on_host.append(event)
|
||||
events_by_host[host] = events_on_host
|
||||
for host in hosts_by_instance[event.instance_uuid]:
|
||||
events_by_host[host].append(event)
|
||||
|
||||
for host in instances_by_host:
|
||||
# TODO(salv-orlando): Handle exceptions raised by the rpc api layer
|
||||
# in order to ensure that a failure in processing events on a host
|
||||
# will not prevent processing events on other hosts
|
||||
self.compute_rpcapi.external_instance_event(
|
||||
context, instances_by_host[host], events_by_host[host])
|
||||
context, instances_by_host[host], events_by_host[host],
|
||||
host=host)
|
||||
|
||||
def _get_relevant_hosts(self, context, instance):
|
||||
hosts = set()
|
||||
hosts.add(instance.host)
|
||||
if instance.migration_context is not None:
|
||||
migration_id = instance.migration_context.migration_id
|
||||
migration = objects.Migration.get_by_id(context, migration_id)
|
||||
hosts.add(migration.dest_compute)
|
||||
hosts.add(migration.source_compute)
|
||||
LOG.debug('Instance %(instance)s is migrating, '
|
||||
'copying events to all relevant hosts: '
|
||||
'%(hosts)s', {'instance': instance.uuid,
|
||||
'hosts': hosts})
|
||||
return hosts
|
||||
|
||||
def get_instance_host_status(self, instance):
|
||||
if instance.host:
|
||||
|
@ -1049,10 +1049,10 @@ class ComputeAPI(object):
|
||||
volume_id=volume_id, snapshot_id=snapshot_id,
|
||||
delete_info=delete_info)
|
||||
|
||||
def external_instance_event(self, ctxt, instances, events):
|
||||
def external_instance_event(self, ctxt, instances, events, host=None):
|
||||
instance = instances[0]
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance),
|
||||
server=_compute_host(host, instance),
|
||||
version='4.0')
|
||||
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
|
||||
events=events)
|
||||
|
@ -37,7 +37,7 @@ MISSING_UUID = '00000000-0000-0000-0000-000000000005'
|
||||
|
||||
|
||||
@classmethod
|
||||
def fake_get_by_uuid(cls, context, uuid):
|
||||
def fake_get_by_uuid(cls, context, uuid, **kwargs):
|
||||
try:
|
||||
return fake_instances[uuid]
|
||||
except KeyError:
|
||||
|
@ -3186,9 +3186,12 @@ class _ComputeAPIUnitTestMixIn(object):
|
||||
|
||||
def test_external_instance_event(self):
|
||||
instances = [
|
||||
objects.Instance(uuid=uuids.instance_1, host='host1'),
|
||||
objects.Instance(uuid=uuids.instance_2, host='host1'),
|
||||
objects.Instance(uuid=uuids.instance_3, host='host2'),
|
||||
objects.Instance(uuid=uuids.instance_1, host='host1',
|
||||
migration_context=None),
|
||||
objects.Instance(uuid=uuids.instance_2, host='host1',
|
||||
migration_context=None),
|
||||
objects.Instance(uuid=uuids.instance_3, host='host2',
|
||||
migration_context=None),
|
||||
]
|
||||
events = [
|
||||
objects.InstanceExternalEvent(
|
||||
@ -3202,10 +3205,61 @@ class _ComputeAPIUnitTestMixIn(object):
|
||||
self.compute_api.external_instance_event(self.context,
|
||||
instances, events)
|
||||
method = self.compute_api.compute_rpcapi.external_instance_event
|
||||
method.assert_any_call(self.context, instances[0:2], events[0:2])
|
||||
method.assert_any_call(self.context, instances[2:], events[2:])
|
||||
method.assert_any_call(self.context, instances[0:2], events[0:2],
|
||||
host='host1')
|
||||
method.assert_any_call(self.context, instances[2:], events[2:],
|
||||
host='host2')
|
||||
self.assertEqual(2, method.call_count)
|
||||
|
||||
def test_external_instance_event_evacuating_instance(self):
|
||||
# Since we're patching the db's migration_get(), use a dict here so
|
||||
# that we can validate the id is making its way correctly to the db api
|
||||
migrations = {}
|
||||
migrations[42] = {'id': 42, 'source_compute': 'host1',
|
||||
'dest_compute': 'host2', 'source_node': None,
|
||||
'dest_node': None, 'dest_host': None,
|
||||
'old_instance_type_id': None,
|
||||
'new_instance_type_id': None,
|
||||
'instance_uuid': uuids.instance_2, 'status': None,
|
||||
'migration_type': 'evacuation', 'memory_total': None,
|
||||
'memory_processed': None, 'memory_remaining': None,
|
||||
'disk_total': None, 'disk_processed': None,
|
||||
'disk_remaining': None, 'deleted': False,
|
||||
'hidden': False, 'created_at': None,
|
||||
'updated_at': None, 'deleted_at': None}
|
||||
|
||||
def migration_get(context, id):
|
||||
return migrations[id]
|
||||
|
||||
instances = [
|
||||
objects.Instance(uuid=uuids.instance_1, host='host1',
|
||||
migration_context=None),
|
||||
objects.Instance(uuid=uuids.instance_2, host='host1',
|
||||
migration_context=objects.MigrationContext(
|
||||
migration_id=42)),
|
||||
objects.Instance(uuid=uuids.instance_3, host='host2',
|
||||
migration_context=None)
|
||||
]
|
||||
events = [
|
||||
objects.InstanceExternalEvent(
|
||||
instance_uuid=uuids.instance_1),
|
||||
objects.InstanceExternalEvent(
|
||||
instance_uuid=uuids.instance_2),
|
||||
objects.InstanceExternalEvent(
|
||||
instance_uuid=uuids.instance_3),
|
||||
]
|
||||
|
||||
with mock.patch('nova.db.sqlalchemy.api.migration_get', migration_get):
|
||||
self.compute_api.compute_rpcapi = mock.MagicMock()
|
||||
self.compute_api.external_instance_event(self.context,
|
||||
instances, events)
|
||||
method = self.compute_api.compute_rpcapi.external_instance_event
|
||||
method.assert_any_call(self.context, instances[0:2], events[0:2],
|
||||
host='host1')
|
||||
method.assert_any_call(self.context, instances[1:], events[1:],
|
||||
host='host2')
|
||||
self.assertEqual(2, method.call_count)
|
||||
|
||||
def test_volume_ops_invalid_task_state(self):
|
||||
instance = self._create_instance_obj()
|
||||
self.assertEqual(instance.vm_state, vm_states.ACTIVE)
|
||||
|
Loading…
Reference in New Issue
Block a user