Merge "Prevent conflicting events from running concurrently"

This commit is contained in:
Zuul 2022-02-24 15:35:43 +00:00 committed by Gerrit Code Review
commit e9c19e2206
2 changed files with 164 additions and 33 deletions

View File

@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import datetime
from operator import itemgetter
import eventlet
from oslo_config import cfg
@ -81,7 +83,7 @@ class ManagerService(service_utils.RPCServer):
# NOTE(jakecoll): stop_on_exception=False was added because database
# exceptions would prevent threads from being scheduled again.
# TODO(jakecoll): Find a way to test this.
self.tg.add_timer_args(EVENT_INTERVAL, self._event,
self.tg.add_timer_args(EVENT_INTERVAL, self._process_events,
stop_on_exception=False)
for m in self.monitors:
m.start_monitoring()
@ -143,25 +145,12 @@ class ManagerService(service_utils.RPCServer):
return actions
@service_utils.with_empty_context
def _event(self):
"""Tries to commit event.
If there is an event in Blazar DB to be done, do it and change its
status to 'DONE'.
"""
LOG.debug('Trying to get event from DB.')
events = db_api.event_get_all_sorted_by_filters(
sort_key='time',
sort_dir='asc',
filters={'status': status.event.UNDONE,
'time': {'op': 'le',
'border': datetime.datetime.utcnow()}}
)
def _process_events_concurrently(self, events):
if not events:
return
LOG.info("Trying to execute events: %s", events)
event_threads = {}
for event in events:
if not status.LeaseStatus.is_stable(event['lease_id']):
LOG.info("Skip event %s because the status of the lease %s "
@ -170,15 +159,89 @@ class ManagerService(service_utils.RPCServer):
db_api.event_update(event['id'],
{'status': status.event.IN_PROGRESS})
try:
eventlet.spawn_n(
event_thread = eventlet.spawn(
service_utils.with_empty_context(self._exec_event),
event)
event_threads[event['id']] = event_thread
except Exception:
db_api.event_update(event['id'],
{'status': status.event.ERROR})
LOG.exception('Error occurred while event %s handling.',
LOG.exception('Error occurred while spawning event %s.',
event['id'])
for event_id, event_thread in event_threads.items():
try:
event_thread.wait()
except Exception:
db_api.event_update(event['id'],
{'status': status.event.ERROR})
LOG.exception('Error occurred while handling event %s.',
event_id)
def _select_for_execution(self, events):
"""Orders the events such that they can be safely executed concurrently.
Events are selected to be executed concurrently if they are of the same
type, while keeping strict time ordering and the following priority of
event types: before_end_lease, end_lease, and start_lease (except for
before_end_lease events where there is a start_lease event for the same
lease at the same time).
We ensure that:
- the before_end_lease event of a lease is executed after the
start_lease event and before the end_lease event of the same lease,
- for two reservations using the same hosts back to back, the end_lease
event is executed before the start_lease event.
"""
if not events:
return []
events_by_lease = defaultdict(list)
events_by_type = defaultdict(list)
for e in sorted(events, key=itemgetter('time')):
events_by_lease[e['lease_id']].append(e)
events_by_type[e['event_type']].append(e)
# If there is a start_lease event for the same lease, we run it first.
deferred_before_end_events = []
deferred_end_events = []
for start_event in events_by_type['start_lease']:
for e in events_by_lease[start_event['lease_id']]:
if e['event_type'] == 'before_end_lease':
events_by_type['before_end_lease'].remove(e)
deferred_before_end_events.append(e)
elif e['event_type'] == 'end_lease':
events_by_type['end_lease'].remove(e)
deferred_end_events.append(e)
return [
events_by_type['before_end_lease'],
events_by_type['end_lease'],
events_by_type['start_lease'],
deferred_before_end_events,
deferred_end_events
]
def _process_events(self):
"""Tries to execute events.
If there is any event in Blazar DB to be executed, do it and change its
status to 'DONE'. Events are executed concurrently if possible.
"""
LOG.debug('Trying to get events from DB.')
events = db_api.event_get_all_sorted_by_filters(
sort_key='time',
sort_dir='asc',
filters={'status': status.event.UNDONE,
'time': {'op': 'le',
'border': datetime.datetime.utcnow()}}
)
for batch in self._select_for_execution(events):
self._process_events_concurrently(batch)
def _exec_event(self, event):
"""Execute an event function"""
event_fn = getattr(self, event['event_type'], None)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
from unittest import mock
@ -269,36 +270,102 @@ class ServiceTestCase(tests.TestCase):
event_update = self.patch(self.db_api, 'event_update')
events.return_value = None
self.manager._event()
self.manager._process_events()
self.assertFalse(event_update.called)
def test_event_success(self):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
events.return_value = [{'id': '111-222-333',
'lease_id': 'lease_id1',
'time': self.good_date},
{'id': '444-555-666',
'lease_id': 'lease_id2',
'time': self.good_date}]
self.patch(eventlet, 'spawn_n')
events.return_value = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'},
{'id': '444-555-666', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'start_lease'}]
self.patch(eventlet, 'spawn')
self.manager._event()
self.manager._process_events()
event_update.assert_has_calls([
mock.call('111-222-333', {'status': status.event.IN_PROGRESS}),
mock.call('444-555-666', {'status': status.event.IN_PROGRESS})])
def test_concurrent_events(self):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
self.patch(self.db_api, 'event_update')
events.return_value = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'},
{'id': '222-333-444', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'end_lease'},
{'id': '333-444-555', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'before_end_lease'},
{'id': '444-555-666', 'time': self.good_date,
# Same lease as start_lease event above
'lease_id': 'aaa-bbb-ccc',
'event_type': 'before_end_lease'},
{'id': '444-555-666', 'time': self.good_date,
# Same lease as start_lease event above
'lease_id': 'aaa-bbb-ccc',
'event_type': 'end_lease'},
{'id': '555-666-777', 'time': self.good_date,
'lease_id': 'ccc-ddd-eee',
'event_type': 'end_lease'},
{'id': '666-777-888',
'time': self.good_date + datetime.timedelta(
minutes=1),
'lease_id': 'ddd-eee-fff',
'event_type': 'end_lease'}]
events_values = copy.copy(events.return_value)
_process_events_concurrently = self.patch(
self.manager, '_process_events_concurrently')
self.manager._process_events()
_process_events_concurrently.assert_has_calls([
# First execute the before_end_lease event which doesn't have a
# corresponding start_lease
mock.call([events_values[2]]),
# Then end_lease events
mock.call([events_values[1], events_values[5], events_values[6]]),
# Then the start_lease event
mock.call([events_values[0]]),
# Then the before_end_lease which is for the same lease as the
# previous start_lease event
mock.call([events_values[3]]),
# Then the end_lease which is for the same lease as the previous
# start_lease event
mock.call([events_values[4]])])
def test_process_events_concurrently(self):
events = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'},
{'id': '222-333-444', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'start_lease'},
{'id': '333-444-555', 'time': self.good_date,
'lease_id': 'ccc-ddd-eee',
'event_type': 'start_lease'}]
spawn = self.patch(eventlet, 'spawn')
self.manager._process_events_concurrently(events)
spawn.assert_has_calls([
mock.call(mock.ANY, events[0]),
mock.call(mock.ANY, events[1]),
mock.call(mock.ANY, events[2])])
def test_event_spawn_fail(self):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
self.patch(eventlet, 'spawn_n').side_effect = Exception
events.return_value = [{'id': '111-222-333',
'lease_id': self.lease_id,
'time': self.good_date}]
self.patch(eventlet, 'spawn').side_effect = Exception
events.return_value = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'}]
self.manager._event()
self.manager._process_events()
event_update.assert_has_calls([
mock.call('111-222-333', {'status': status.event.IN_PROGRESS}),
@ -308,6 +375,7 @@ class ServiceTestCase(tests.TestCase):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
events.return_value = [{'id': '111-222-333',
'lease_id': self.lease_id,
'event_type': 'start_lease',
'time': self.good_date}]
self.lease_get = self.patch(self.db_api, 'lease_get')
@ -317,7 +385,7 @@ class ServiceTestCase(tests.TestCase):
event_update = self.patch(self.db_api, 'event_update')
self.manager._event()
self.manager._process_events()
event_update.assert_not_called()