diff --git a/blazar/manager/service.py b/blazar/manager/service.py index e3e4f803..0478304c 100644 --- a/blazar/manager/service.py +++ b/blazar/manager/service.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict import datetime +from operator import itemgetter import eventlet from oslo_config import cfg @@ -81,7 +83,7 @@ class ManagerService(service_utils.RPCServer): # NOTE(jakecoll): stop_on_exception=False was added because database # exceptions would prevent threads from being scheduled again. # TODO(jakecoll): Find a way to test this. - self.tg.add_timer_args(EVENT_INTERVAL, self._event, + self.tg.add_timer_args(EVENT_INTERVAL, self._process_events, stop_on_exception=False) for m in self.monitors: m.start_monitoring() @@ -143,25 +145,12 @@ class ManagerService(service_utils.RPCServer): return actions @service_utils.with_empty_context - def _event(self): - """Tries to commit event. - - If there is an event in Blazar DB to be done, do it and change its - status to 'DONE'. - """ - LOG.debug('Trying to get event from DB.') - events = db_api.event_get_all_sorted_by_filters( - sort_key='time', - sort_dir='asc', - filters={'status': status.event.UNDONE, - 'time': {'op': 'le', - 'border': datetime.datetime.utcnow()}} - ) - + def _process_events_concurrently(self, events): if not events: return LOG.info("Trying to execute events: %s", events) + event_threads = {} for event in events: if not status.LeaseStatus.is_stable(event['lease_id']): LOG.info("Skip event %s because the status of the lease %s " @@ -170,15 +159,89 @@ class ManagerService(service_utils.RPCServer): db_api.event_update(event['id'], {'status': status.event.IN_PROGRESS}) try: - eventlet.spawn_n( + event_thread = eventlet.spawn( service_utils.with_empty_context(self._exec_event), event) + event_threads[event['id']] = event_thread except Exception: db_api.event_update(event['id'], {'status': status.event.ERROR}) - LOG.exception('Error occurred while event %s handling.', + LOG.exception('Error occurred while spawning event %s.', event['id']) + for event_id, event_thread in event_threads.items(): + try: + event_thread.wait() + except Exception: + db_api.event_update(event['id'], + {'status': status.event.ERROR}) + LOG.exception('Error occurred while handling event %s.', + event_id) + + def _select_for_execution(self, events): + """Orders the events such that they can be safely executed concurrently. + + Events are selected to be executed concurrently if they are of the same + type, while keeping strict time ordering and the following priority of + event types: before_end_lease, end_lease, and start_lease (except for + before_end_lease events where there is a start_lease event for the same + lease at the same time). + + We ensure that: + + - the before_end_lease event of a lease is executed after the + start_lease event and before the end_lease event of the same lease, + - for two reservations using the same hosts back to back, the end_lease + event is executed before the start_lease event. + """ + if not events: + return [] + + events_by_lease = defaultdict(list) + events_by_type = defaultdict(list) + + for e in sorted(events, key=itemgetter('time')): + events_by_lease[e['lease_id']].append(e) + events_by_type[e['event_type']].append(e) + + # If there is a start_lease event for the same lease, we run it first. + deferred_before_end_events = [] + deferred_end_events = [] + for start_event in events_by_type['start_lease']: + for e in events_by_lease[start_event['lease_id']]: + if e['event_type'] == 'before_end_lease': + events_by_type['before_end_lease'].remove(e) + deferred_before_end_events.append(e) + elif e['event_type'] == 'end_lease': + events_by_type['end_lease'].remove(e) + deferred_end_events.append(e) + + return [ + events_by_type['before_end_lease'], + events_by_type['end_lease'], + events_by_type['start_lease'], + deferred_before_end_events, + deferred_end_events + ] + + def _process_events(self): + """Tries to execute events. + + If there is any event in Blazar DB to be executed, do it and change its + status to 'DONE'. Events are executed concurrently if possible. + """ + LOG.debug('Trying to get events from DB.') + events = db_api.event_get_all_sorted_by_filters( + sort_key='time', + sort_dir='asc', + filters={'status': status.event.UNDONE, + 'time': {'op': 'le', + 'border': datetime.datetime.utcnow()}} + ) + + for batch in self._select_for_execution(events): + self._process_events_concurrently(batch) + def _exec_event(self, event): """Execute an event function""" event_fn = getattr(self, event['event_type'], None) diff --git a/blazar/tests/manager/test_service.py b/blazar/tests/manager/test_service.py index 56971054..35e83574 100644 --- a/blazar/tests/manager/test_service.py +++ b/blazar/tests/manager/test_service.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import datetime from unittest import mock @@ -269,36 +270,102 @@ class ServiceTestCase(tests.TestCase): event_update = self.patch(self.db_api, 'event_update') events.return_value = None - self.manager._event() + self.manager._process_events() self.assertFalse(event_update.called) def test_event_success(self): events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') event_update = self.patch(self.db_api, 'event_update') - events.return_value = [{'id': '111-222-333', - 'lease_id': 'lease_id1', - 'time': self.good_date}, - {'id': '444-555-666', - 'lease_id': 'lease_id2', - 'time': self.good_date}] - self.patch(eventlet, 'spawn_n') + events.return_value = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}, + {'id': '444-555-666', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'start_lease'}] + self.patch(eventlet, 'spawn') - self.manager._event() + self.manager._process_events() event_update.assert_has_calls([ mock.call('111-222-333', {'status': status.event.IN_PROGRESS}), mock.call('444-555-666', {'status': status.event.IN_PROGRESS})]) + def test_concurrent_events(self): + events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') + self.patch(self.db_api, 'event_update') + events.return_value = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}, + {'id': '222-333-444', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'end_lease'}, + {'id': '333-444-555', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'before_end_lease'}, + {'id': '444-555-666', 'time': self.good_date, + # Same lease as start_lease event above + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'before_end_lease'}, + {'id': '444-555-666', 'time': self.good_date, + # Same lease as start_lease event above + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'end_lease'}, + {'id': '555-666-777', 'time': self.good_date, + 'lease_id': 'ccc-ddd-eee', + 'event_type': 'end_lease'}, + {'id': '666-777-888', + 'time': self.good_date + datetime.timedelta( + minutes=1), + 'lease_id': 'ddd-eee-fff', + 'event_type': 'end_lease'}] + events_values = copy.copy(events.return_value) + _process_events_concurrently = self.patch( + self.manager, '_process_events_concurrently') + + self.manager._process_events() + _process_events_concurrently.assert_has_calls([ + # First execute the before_end_lease event which doesn't have a + # corresponding start_lease + mock.call([events_values[2]]), + # Then end_lease events + mock.call([events_values[1], events_values[5], events_values[6]]), + # Then the start_lease event + mock.call([events_values[0]]), + # Then the before_end_lease which is for the same lease as the + # previous start_lease event + mock.call([events_values[3]]), + # Then the end_lease which is for the same lease as the previous + # start_lease event + mock.call([events_values[4]])]) + + def test_process_events_concurrently(self): + events = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}, + {'id': '222-333-444', 'time': self.good_date, + 'lease_id': 'bbb-ccc-ddd', + 'event_type': 'start_lease'}, + {'id': '333-444-555', 'time': self.good_date, + 'lease_id': 'ccc-ddd-eee', + 'event_type': 'start_lease'}] + spawn = self.patch(eventlet, 'spawn') + + self.manager._process_events_concurrently(events) + spawn.assert_has_calls([ + mock.call(mock.ANY, events[0]), + mock.call(mock.ANY, events[1]), + mock.call(mock.ANY, events[2])]) + def test_event_spawn_fail(self): events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') event_update = self.patch(self.db_api, 'event_update') - self.patch(eventlet, 'spawn_n').side_effect = Exception - events.return_value = [{'id': '111-222-333', - 'lease_id': self.lease_id, - 'time': self.good_date}] + self.patch(eventlet, 'spawn').side_effect = Exception + events.return_value = [{'id': '111-222-333', 'time': self.good_date, + 'lease_id': 'aaa-bbb-ccc', + 'event_type': 'start_lease'}] - self.manager._event() + self.manager._process_events() event_update.assert_has_calls([ mock.call('111-222-333', {'status': status.event.IN_PROGRESS}), @@ -308,6 +375,7 @@ class ServiceTestCase(tests.TestCase): events = self.patch(self.db_api, 'event_get_all_sorted_by_filters') events.return_value = [{'id': '111-222-333', 'lease_id': self.lease_id, + 'event_type': 'start_lease', 'time': self.good_date}] self.lease_get = self.patch(self.db_api, 'lease_get') @@ -317,7 +385,7 @@ class ServiceTestCase(tests.TestCase): event_update = self.patch(self.db_api, 'event_update') - self.manager._event() + self.manager._process_events() event_update.assert_not_called()