Lock tenants, pipelines, queues during processing

In addition to the tenant read lock, schedulers need to also acquire an
exclusive pipeline lock. In case a pipeline is already locked by another
scheduler, it is skipped and processing will continue with the next
pipeline in the tenant.

Tenant management and trigger queues are protected by a queue specific
lock and need to acquire other locks depending on the processed event
(e.g. tenant write lock for tenant reconfigurations).

This also fixes an issue with the locked context manager when an
exception is thrown inside the context. In this case the lock was not
unlocked.

Change-Id: I2e3f0feca7119908d1d13010d2e4de791f95137e
This commit is contained in:
Simon Westphahl 2021-06-11 15:37:33 +02:00
parent 95ee8e8150
commit 0b95dfd07c
3 changed files with 147 additions and 70 deletions

View File

@ -649,7 +649,8 @@ class TestLocks(ZooKeeperBaseTestCase):
def test_locking_ctx(self):
lock = self.zk_client.client.Lock("/lock")
with locked(lock):
with locked(lock) as ctx_lock:
self.assertIs(lock, ctx_lock)
self.assertTrue(lock.is_acquired)
self.assertFalse(lock.is_acquired)
@ -664,6 +665,14 @@ class TestLocks(ZooKeeperBaseTestCase):
pass
self.assertFalse(lock.is_acquired)
def test_unlock_exception(self):
lock = self.zk_client.client.Lock("/lock")
with testtools.ExpectedException(RuntimeError):
with locked(lock):
self.assertTrue(lock.is_acquired)
raise RuntimeError
self.assertFalse(lock.is_acquired)
class TestLayoutStore(ZooKeeperBaseTestCase):

View File

@ -82,7 +82,15 @@ from zuul.zk.event_queues import (
PipelineTriggerEventQueue,
TENANT_ROOT,
)
from zuul.zk.exceptions import LockException
from zuul.zk.layout import LayoutState, LayoutStateStore
from zuul.zk.locks import (
tenant_read_lock,
tenant_write_lock,
pipeline_lock,
management_queue_lock,
trigger_queue_lock,
)
from zuul.zk.nodepool import ZooKeeperNodepool
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@ -921,15 +929,16 @@ class Scheduler(threading.Thread):
continue
old_tenant = self.abide.tenants.get(tenant_name)
tenant = loader.loadTenant(self.abide, tenant_name,
self.ansible_manager,
self.unparsed_abide,
cache_ltime=cache_ltime)
reconfigured_tenants.append(tenant_name)
if tenant is not None:
self._reconfigureTenant(tenant, old_tenant)
else:
self._reconfigureDeleteTenant(old_tenant)
with tenant_write_lock(self.zk_client, tenant_name):
tenant = loader.loadTenant(self.abide, tenant_name,
self.ansible_manager,
self.unparsed_abide,
cache_ltime=cache_ltime)
reconfigured_tenants.append(tenant_name)
if tenant is not None:
self._reconfigureTenant(tenant, old_tenant)
else:
self._reconfigureDeleteTenant(old_tenant)
duration = round(time.monotonic() - start, 3)
self.log.info("Reconfiguration complete (smart: %s, "
@ -940,8 +949,7 @@ class Scheduler(threading.Thread):
def _doTenantReconfigureEvent(self, event):
# This is called in the scheduler loop after another thread submits
# a request
self.layout_lock.acquire()
try:
with self.layout_lock:
self.log.info("Tenant reconfiguration beginning for %s due to "
"projects %s",
event.tenant_name, event.project_branches)
@ -962,13 +970,12 @@ class Scheduler(threading.Thread):
old_tenant = self.abide.tenants.get(event.tenant_name)
loader.loadTPCs(self.abide, self.unparsed_abide,
[event.tenant_name])
loader.loadTenant(self.abide, event.tenant_name,
self.ansible_manager, self.unparsed_abide,
cache_ltime=event.zuul_event_ltime)
tenant = self.abide.tenants[event.tenant_name]
self._reconfigureTenant(tenant, old_tenant)
finally:
self.layout_lock.release()
with tenant_write_lock(self.zk_client, event.tenant_name):
loader.loadTenant(self.abide, event.tenant_name,
self.ansible_manager, self.unparsed_abide,
cache_ltime=event.zuul_event_ltime)
tenant = self.abide.tenants[event.tenant_name]
self._reconfigureTenant(tenant, old_tenant)
duration = round(time.monotonic() - start, 3)
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
"seconds)", event.tenant_name, duration)
@ -1359,32 +1366,19 @@ class Scheduler(threading.Thread):
self.process_tenant_management_queue(tenant)
for tenant in self.abide.tenants.values():
if not self._stopped:
# This will forward trigger events to matching
# pipeline event queues that are processed below.
self.process_tenant_trigger_queue(tenant)
try:
with tenant_read_lock(
self.zk_client, tenant.name, blocking=False
):
if not self._stopped:
# This will forward trigger events to pipeline
# event queues that are processed below.
self.process_tenant_trigger_queue(tenant)
for pipeline in tenant.layout.pipelines.values():
self.process_pipeline_management_queue(
tenant, pipeline)
# Give result events priority -- they let us stop
# builds, whereas trigger events cause us to execute
# builds.
self.process_pipeline_result_queue(tenant, pipeline)
self.process_pipeline_trigger_queue(tenant, pipeline)
try:
while (pipeline.manager.processQueue() and
not self._stopped):
pass
except Exception:
self.log.exception(
"Exception in pipeline processing:")
pipeline.state = pipeline.STATE_ERROR
# Continue processing other pipelines+tenants
else:
pipeline.state = pipeline.STATE_NORMAL
self.process_pipelines(tenant)
except LockException:
self.log.debug("Skipping locked tenant %s",
tenant.name)
except Exception:
self.log.exception("Exception in run handler:")
# There may still be more events to process
@ -1392,6 +1386,36 @@ class Scheduler(threading.Thread):
finally:
self.run_handler_lock.release()
def process_pipelines(self, tenant):
for pipeline in tenant.layout.pipelines.values():
if self._stopped:
return
try:
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name, blocking=False
):
self._process_pipeline(tenant, pipeline)
except LockException:
self.log.debug("Skipping locked pipeline %s in tenant %s",
pipeline.name, tenant.name)
def _process_pipeline(self, tenant, pipeline):
self.process_pipeline_management_queue(tenant, pipeline)
# Give result events priority -- they let us stop builds, whereas
# trigger events cause us to execute builds.
self.process_pipeline_result_queue(tenant, pipeline)
self.process_pipeline_trigger_queue(tenant, pipeline)
try:
while not self._stopped and pipeline.manager.processQueue():
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline.state = pipeline.STATE_ERROR
# Continue processing other pipelines+tenants
else:
pipeline.state = pipeline.STATE_NORMAL
def maintainConnectionCache(self):
# TODOv3(jeblair): update for tenants
relevant = set()
@ -1410,16 +1434,23 @@ class Scheduler(threading.Thread):
self.log.debug("Connection cache size: %s" % len(relevant))
def process_tenant_trigger_queue(self, tenant):
for event in self.trigger_events[tenant.name]:
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Forwarding trigger event %s", event)
try:
self._forward_trigger_event(event, tenant)
except Exception:
log.exception("Unable to forward event %s "
"to tenant %s", event, tenant.name)
finally:
self.trigger_events[tenant.name].ack(event)
try:
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
):
for event in self.trigger_events[tenant.name]:
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Forwarding trigger event %s", event)
try:
self._forward_trigger_event(event, tenant)
except Exception:
log.exception("Unable to forward event %s "
"to tenant %s", event, tenant.name)
finally:
self.trigger_events[tenant.name].ack(event)
except LockException:
self.log.debug("Skipping locked trigger event queue in tenant %s",
tenant.name)
def _forward_trigger_event(self, event, tenant):
log = get_annotated_logger(self.log, event.zuul_event_id)
@ -1522,6 +1553,16 @@ class Scheduler(threading.Thread):
pipeline.manager.addChange(change, event)
def process_tenant_management_queue(self, tenant):
try:
with management_queue_lock(
self.zk_client, tenant.name, blocking=False
):
self._process_tenant_management_queue(tenant)
except LockException:
self.log.debug("Skipping locked management event queue"
" in tenant %s", tenant.name)
def _process_tenant_management_queue(self, tenant):
for event in self.management_events[tenant.name]:
event_forwarded = False
try:

View File

@ -26,36 +26,63 @@ TENANT_LOCK_ROOT = f"{LOCK_ROOT}/tenant"
def locked(lock, blocking=True, timeout=None):
if not lock.acquire(blocking=blocking, timeout=timeout):
raise LockException(f"Failed to acquire lock {lock}")
yield
try:
lock.release()
except Exception:
log = logging.getLogger("zuul.zk.locks")
log.exception("Failed to release lock %s", lock)
yield lock
finally:
try:
lock.release()
except Exception:
log = logging.getLogger("zuul.zk.locks")
log.exception("Failed to release lock %s", lock)
def tenant_read_lock(client, tenant_name):
@contextmanager
def tenant_read_lock(client, tenant_name, blocking=True):
safe_tenant = quote_plus(tenant_name)
return client.client.ReadLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}")
with locked(
client.client.ReadLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
blocking=blocking
) as lock:
yield lock
def tenant_write_lock(client, tenant_name):
@contextmanager
def tenant_write_lock(client, tenant_name, blocking=True):
safe_tenant = quote_plus(tenant_name)
return client.client.WriteLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}")
with locked(
client.client.WriteLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
blocking=blocking
) as lock:
yield lock
def pipeline_lock(client, tenant_name, pipeline_name):
@contextmanager
def pipeline_lock(client, tenant_name, pipeline_name, blocking=True):
safe_tenant = quote_plus(tenant_name)
safe_pipeline = quote_plus(pipeline_name)
return client.client.Lock(
f"/zuul/locks/pipeline/{safe_tenant}/{safe_pipeline}")
with locked(
client.client.Lock(
f"/zuul/locks/pipeline/{safe_tenant}/{safe_pipeline}"),
blocking=blocking
) as lock:
yield lock
def management_queue_lock(client, tenant_name):
@contextmanager
def management_queue_lock(client, tenant_name, blocking=True):
safe_tenant = quote_plus(tenant_name)
return client.client.Lock(f"/zuul/locks/events/management/{safe_tenant}")
with locked(
client.client.Lock(f"/zuul/locks/events/management/{safe_tenant}"),
blocking=blocking
) as lock:
yield lock
def trigger_queue_lock(client, tenant_name):
@contextmanager
def trigger_queue_lock(client, tenant_name, blocking=True):
safe_tenant = quote_plus(tenant_name)
return client.client.Lock(f"/zuul/locks/events/trigger/{safe_tenant}")
with locked(
client.client.Lock(f"/zuul/locks/events/trigger/{safe_tenant}"),
blocking=blocking
) as lock:
yield lock