Fix pipeline event starvation due to cleanup locks
The test_web_labels_allowed_list test has been failing occasionally due to a timeout waiting to settle. The test ends with an unprocessed PipelinePostConfig event in the tenant-one.check pipeline management queue. In these cases, the scheduler skips processing the tenant-one.check pipeline because it is locked. We can't see exactly why it's locked, but based on log messages, it seems likely that one of the periodic cleanup routines may be responsible. These should finish quickly, which means the starvation is likely due to a race. I suspect the sequence is (numbers indicate threads): 1] PipelinePostConfig event emitted 2] Cleanup routine (eg node cleanup) locks the pipeline 1] Main loop wakes up due to the event notification 1] Main loop fails to lock pipeline 1] Main loop sleeps 2] Cleanup routine completes and unlocks pipeline At this point, the system is idle, and no more events are emitted, so the main loop never wakes up again. To address this, we could set the wake flag each time we encounter a locked pipeline (we do something similiar in other cases), but on a busy system with two schedulers, that would probably just always set the flag. The cases where we do that also have other constraints that make the infinite-wakeup situation less likely. Instead, whenever we encounter a locked pipeline, let's check to see if there are any events for that pipeline, and if so, set the wake flag so that we run through the loop again. This may (probably will) add an extra loop iteration in many cases (where a second scheduler will essentially double check that the first scheduler did finish processing all the events while the first scheduler had the pipeline locked). But that shouldn't be a huge cost, and it will avoid the starvation seen in the test, which could really happen on a nearly-idle production system. Change-Id: I05536389377b7753fb2430d052d188c7f2a37bff
This commit is contained in:
+21
-7
@@ -2098,6 +2098,17 @@ class Scheduler(threading.Thread):
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked pipeline %s in tenant %s",
|
||||
pipeline.name, tenant.name)
|
||||
try:
|
||||
# In case this pipeline is locked for some reason
|
||||
# other than processing events, we need to return
|
||||
# to it to process them.
|
||||
if self._pipelineHasEvents(tenant, pipeline):
|
||||
self.wake_event.set()
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception checking events for pipeline "
|
||||
"%s in tenant %s",
|
||||
pipeline.name, tenant.name)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception processing pipeline %s in tenant %s",
|
||||
@@ -2121,12 +2132,8 @@ class Scheduler(threading.Thread):
|
||||
self.statsd.gauge(f'{stats_key}.write_bytes',
|
||||
ctx.cumulative_write_bytes)
|
||||
|
||||
def _process_pipeline(self, tenant, pipeline):
|
||||
# Return whether or not we refreshed the pipeline.
|
||||
|
||||
# We only need to process the pipeline if there are
|
||||
# outstanding events.
|
||||
if not any((
|
||||
def _pipelineHasEvents(self, tenant, pipeline):
|
||||
return any((
|
||||
self.pipeline_trigger_events[
|
||||
tenant.name][pipeline.name].hasEvents(),
|
||||
self.pipeline_result_events[
|
||||
@@ -2134,7 +2141,14 @@ class Scheduler(threading.Thread):
|
||||
self.pipeline_management_events[
|
||||
tenant.name][pipeline.name].hasEvents(),
|
||||
pipeline.state.isDirty(self.zk_client.client),
|
||||
)):
|
||||
))
|
||||
|
||||
def _process_pipeline(self, tenant, pipeline):
|
||||
# Return whether or not we refreshed the pipeline.
|
||||
|
||||
# We only need to process the pipeline if there are
|
||||
# outstanding events.
|
||||
if not self._pipelineHasEvents(tenant, pipeline):
|
||||
self.log.debug("No events to process for pipeline %s in tenant %s",
|
||||
pipeline.name, tenant.name)
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user