From c92edb8a177de51862ad2a4f9cbac2c50d31ef84 Mon Sep 17 00:00:00 2001 From: Pierre Riteau Date: Wed, 8 Aug 2018 12:46:28 +0200 Subject: [PATCH] Prevent conflicting events from running concurrently If two leases have compute hosts in common, and the second lease starts exactly when the first lease ends, there is the possibility of a race. The Blazar manager can first run the start_lease event of the second lease. This event would fail since the end_lease event of the first lease would still be UNDONE, and the compute hosts in common would still be in the aggregate associated with the first lease, instead of being in the freepool. This patch changes event execution code so that events are executed concurrently if possible, with the following constraints: - events are executed strictly in order, i.e. events are started only after all previous events have completed - when events are at the same time, we first execute before_end_lease events (unless there is a start_lease at the same time), then end_lease events, followed by start_lease events, ensuring the bug described above does not happen. Finally, we run any before_end_lease which had a corresponding start_lease event at the same time. It also has the side effect of providing better stack traces for event execution failures, since we call wait() on all GreenThread objects. Co-Authored-By: Jason Anderson Change-Id: Ie2339db18e8baee379fbea082f1238ec44fca6b1 Closes-Bug: #1785841 --- blazar/manager/service.py | 99 +++++++++++++++++++++++----- blazar/tests/manager/test_service.py | 98 ++++++++++++++++++++++----- 2 files changed, 164 insertions(+), 33 deletions(-) diff --git a/blazar/manager/service.py b/blazar/manager/service.py index b0717e94..fe2a58ea 100644 --- a/blazar/manager/service.py +++ b/blazar/manager/service.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict import datetime +from operator import itemgetter import eventlet from oslo_config import cfg @@ -77,7 +79,7 @@ class ManagerService(service_utils.RPCServer): # NOTE(jakecoll): stop_on_exception=False was added because database # exceptions would prevent threads from being scheduled again. # TODO(jakecoll): Find a way to test this. - self.tg.add_timer_args(EVENT_INTERVAL, self._event, + self.tg.add_timer_args(EVENT_INTERVAL, self._process_events, stop_on_exception=False) for m in self.monitors: m.start_monitoring() @@ -139,25 +141,12 @@ class ManagerService(service_utils.RPCServer): return actions @service_utils.with_empty_context - def _event(self): - """Tries to commit event. - - If there is an event in Blazar DB to be done, do it and change its - status to 'DONE'. - """ - LOG.debug('Trying to get event from DB.') - events = db_api.event_get_all_sorted_by_filters( - sort_key='time', - sort_dir='asc', - filters={'status': status.event.UNDONE, - 'time': {'op': 'le', - 'border': datetime.datetime.utcnow()}} - ) - + def _process_events_concurrently(self, events): if not events: return LOG.info("Trying to execute events: %s", events) + event_threads = {} for event in events: if not status.LeaseStatus.is_stable(event['lease_id']): LOG.info("Skip event %s because the status of the lease %s " @@ -166,15 +155,89 @@ class ManagerService(service_utils.RPCServer): db_api.event_update(event['id'], {'status': status.event.IN_PROGRESS}) try: - eventlet.spawn_n( + event_thread = eventlet.spawn( service_utils.with_empty_context(self._exec_event), event) + event_threads[event['id']] = event_thread except Exception: db_api.event_update(event['id'], {'status': status.event.ERROR}) - LOG.exception('Error occurred while event %s handling.', + LOG.exception('Error occurred while spawning event %s.', event['id']) + for event_id, event_thread in event_threads.items(): + try: + event_thread.wait() + except Exception: + db_api.event_update(event['id'], + {'status': status.event.ERROR}) + LOG.exception('Error occurred while handling event %s.', + event_id) + + def _select_for_execution(self, events): + """Orders the events such that they can be safely executed concurrently. + + Events are selected to be executed concurrently if they are of the same + type, while keeping strict time ordering and the following priority of + event types: before_end_lease, end_lease, and start_lease (except for + before_end_lease events where there is a start_lease event for the same + lease at the same time). + + We ensure that: + + - the before_end_lease event of a lease is executed after the + start_lease event and before the end_lease event of the same lease, + - for two reservations using the same hosts back to back, the end_lease + event is executed before the start_lease event. + """ + if not events: + return [] + + events_by_lease = defaultdict(list) + events_by_type = defaultdict(list) + + for e in sorted(events, key=itemgetter('time')): + events_by_lease[e['lease_id']].append(e) + events_by_type[e['event_type']].append(e) + + # If there is a start_lease event for the same lease, we run it first. + deferred_before_end_events = [] + deferred_end_events = [] + for start_event in events_by_type['start_lease']: + for e in events_by_lease[start_event['lease_id']]: + if e['event_type'] == 'before_end_lease': + events_by_type['before_end_lease'].remove(e) + deferred_before_end_events.append(e) + elif e['event_type'] == 'end_lease': + events_by_type['end_lease'].remove(e) + deferred_end_events.append(e) + + return [ + events_by_type['before_end_lease'], + events_by_type['end_lease'], + events_by_type['start_lease'], + deferred_before_end_events, + deferred_end_events + ] + + def _process_events(self): + """Tries to execute events. + + If there is any event in Blazar DB to be executed, do it and change its + status to 'DONE'. Events are executed concurrently if possible. + """ + LOG.debug('Trying to get events from DB.') + events = db_api.event_get_all_sorted_by_filters( + sort_key='time', + sort_dir='asc', + filters={'status': status.event.UNDONE, + 'time': {'op': 'le', + 'border': datetime.datetime.utcnow()}} + ) + + for batch in self._select_for_execution(events): + self._process_events_concurrently(batch) + def _exec_event(self, event): """Execute an event function""" event_fn = getattr(self, event['event_type'], None) diff --git a/blazar/tests/manager/test_service.py b/blazar/tests/manager/test_service.py index d416bc19..2ecc53cd 100644 --- a/blazar/tests/manager/test_service.py +++ b/blazar/tests/manager/test_service.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import datetime from unittest import mock @@ -235,36 +236,102 @@ class ServiceTestCase(tests.TestCase): event_update = self.patch(self.db_api, 'event_update') events.return_value = None - self.manager._event() + self.manager._process_events() self.assertFalse(event_update.called) def test_event_success(self): events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') event_update = self.patch(self.db_api, 'event_update') - events.return_value = [{'id': '111-222-333', - 'lease_id': 'lease_id1', - 'time': self.good_date}, - {'id': '444-555-666', - 'lease_id': 'lease_id2', - 'time': self.good_date}] - self.patch(eventlet, 'spawn_n') + events.return_value = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}, + {'id': '444-555-666', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'start_lease'}] + self.patch(eventlet, 'spawn') - self.manager._event() + self.manager._process_events() event_update.assert_has_calls([ mock.call('111-222-333', {'status': status.event.IN_PROGRESS}), mock.call('444-555-666', {'status': status.event.IN_PROGRESS})]) + def test_concurrent_events(self): + events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') + self.patch(self.db_api, 'event_update') + events.return_value = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}, + {'id': '222-333-444', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'end_lease'}, + {'id': '333-444-555', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'before_end_lease'}, + {'id': '444-555-666', 'time': self.good_date, + # Same lease as start_lease event above + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'before_end_lease'}, + {'id': '444-555-666', 'time': self.good_date, + # Same lease as start_lease event above + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'end_lease'}, + {'id': '555-666-777', 'time': self.good_date, + 'lease_id': 'ccc-ddd-eee', + 'event_type': 'end_lease'}, + {'id': '666-777-888', + 'time': self.good_date + datetime.timedelta( + minutes=1), + 'lease_id': 'ddd-eee-fff', + 'event_type': 'end_lease'}] + events_values = copy.copy(events.return_value) + _process_events_concurrently = self.patch( + self.manager, '_process_events_concurrently') + + self.manager._process_events() + _process_events_concurrently.assert_has_calls([ + # First execute the before_end_lease event which doesn't have a + # corresponding start_lease + mock.call([events_values[2]]), + # Then end_lease events + mock.call([events_values[1], events_values[5], events_values[6]]), + # Then the start_lease event + mock.call([events_values[0]]), + # Then the before_end_lease which is for the same lease as the + # previous start_lease event + mock.call([events_values[3]]), + # Then the end_lease which is for the same lease as the previous + # start_lease event + mock.call([events_values[4]])]) + + def test_process_events_concurrently(self): + events = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}, + {'id': '222-333-444', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'start_lease'}, + {'id': '333-444-555', 'time': self.good_date, + 'lease_id': 'ccc-ddd-eee', + 'event_type': 'start_lease'}] + spawn = self.patch(eventlet, 'spawn') + + self.manager._process_events_concurrently(events) + spawn.assert_has_calls([ + mock.call(mock.ANY, events[0]), + mock.call(mock.ANY, events[1]), + mock.call(mock.ANY, events[2])]) + def test_event_spawn_fail(self): events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') event_update = self.patch(self.db_api, 'event_update') - self.patch(eventlet, 'spawn_n').side_effect = Exception - events.return_value = [{'id': '111-222-333', - 'lease_id': self.lease_id, - 'time': self.good_date}] + self.patch(eventlet, 'spawn').side_effect = Exception + events.return_value = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}] - self.manager._event() + self.manager._process_events() event_update.assert_has_calls([ mock.call('111-222-333', {'status': status.event.IN_PROGRESS}), @@ -274,6 +341,7 @@ class ServiceTestCase(tests.TestCase): events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') events.return_value = [{'id': '111-222-333', 'lease_id': self.lease_id, + 'event_type': 'start_lease', 'time': self.good_date}] self.lease_get = self.patch(self.db_api, 'lease_get') @@ -283,7 +351,7 @@ class ServiceTestCase(tests.TestCase): event_update = self.patch(self.db_api, 'event_update') - self.manager._event() + self.manager._process_events() event_update.assert_not_called()