Merge "Lock tenants, pipelines, queues during processing"
This commit is contained in:
commit
ea6e56a312
|
@ -649,7 +649,8 @@ class TestLocks(ZooKeeperBaseTestCase):
|
|||
|
||||
def test_locking_ctx(self):
|
||||
lock = self.zk_client.client.Lock("/lock")
|
||||
with locked(lock):
|
||||
with locked(lock) as ctx_lock:
|
||||
self.assertIs(lock, ctx_lock)
|
||||
self.assertTrue(lock.is_acquired)
|
||||
self.assertFalse(lock.is_acquired)
|
||||
|
||||
|
@ -664,6 +665,14 @@ class TestLocks(ZooKeeperBaseTestCase):
|
|||
pass
|
||||
self.assertFalse(lock.is_acquired)
|
||||
|
||||
def test_unlock_exception(self):
|
||||
lock = self.zk_client.client.Lock("/lock")
|
||||
with testtools.ExpectedException(RuntimeError):
|
||||
with locked(lock):
|
||||
self.assertTrue(lock.is_acquired)
|
||||
raise RuntimeError
|
||||
self.assertFalse(lock.is_acquired)
|
||||
|
||||
|
||||
class TestLayoutStore(ZooKeeperBaseTestCase):
|
||||
|
||||
|
|
|
@ -82,7 +82,15 @@ from zuul.zk.event_queues import (
|
|||
PipelineTriggerEventQueue,
|
||||
TENANT_ROOT,
|
||||
)
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.layout import LayoutState, LayoutStateStore
|
||||
from zuul.zk.locks import (
|
||||
tenant_read_lock,
|
||||
tenant_write_lock,
|
||||
pipeline_lock,
|
||||
management_queue_lock,
|
||||
trigger_queue_lock,
|
||||
)
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
|
||||
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
|
||||
|
@ -914,15 +922,16 @@ class Scheduler(threading.Thread):
|
|||
continue
|
||||
|
||||
old_tenant = self.abide.tenants.get(tenant_name)
|
||||
tenant = loader.loadTenant(self.abide, tenant_name,
|
||||
self.ansible_manager,
|
||||
self.unparsed_abide,
|
||||
cache_ltime=cache_ltime)
|
||||
reconfigured_tenants.append(tenant_name)
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
else:
|
||||
self._reconfigureDeleteTenant(old_tenant)
|
||||
with tenant_write_lock(self.zk_client, tenant_name):
|
||||
tenant = loader.loadTenant(self.abide, tenant_name,
|
||||
self.ansible_manager,
|
||||
self.unparsed_abide,
|
||||
cache_ltime=cache_ltime)
|
||||
reconfigured_tenants.append(tenant_name)
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
else:
|
||||
self._reconfigureDeleteTenant(old_tenant)
|
||||
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Reconfiguration complete (smart: %s, "
|
||||
|
@ -933,8 +942,7 @@ class Scheduler(threading.Thread):
|
|||
def _doTenantReconfigureEvent(self, event):
|
||||
# This is called in the scheduler loop after another thread submits
|
||||
# a request
|
||||
self.layout_lock.acquire()
|
||||
try:
|
||||
with self.layout_lock:
|
||||
self.log.info("Tenant reconfiguration beginning for %s due to "
|
||||
"projects %s",
|
||||
event.tenant_name, event.project_branches)
|
||||
|
@ -955,13 +963,12 @@ class Scheduler(threading.Thread):
|
|||
old_tenant = self.abide.tenants.get(event.tenant_name)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide,
|
||||
[event.tenant_name])
|
||||
loader.loadTenant(self.abide, event.tenant_name,
|
||||
self.ansible_manager, self.unparsed_abide,
|
||||
cache_ltime=event.zuul_event_ltime)
|
||||
tenant = self.abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
finally:
|
||||
self.layout_lock.release()
|
||||
with tenant_write_lock(self.zk_client, event.tenant_name):
|
||||
loader.loadTenant(self.abide, event.tenant_name,
|
||||
self.ansible_manager, self.unparsed_abide,
|
||||
cache_ltime=event.zuul_event_ltime)
|
||||
tenant = self.abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
|
||||
"seconds)", event.tenant_name, duration)
|
||||
|
@ -1347,32 +1354,19 @@ class Scheduler(threading.Thread):
|
|||
self.process_tenant_management_queue(tenant)
|
||||
|
||||
for tenant in self.abide.tenants.values():
|
||||
if not self._stopped:
|
||||
# This will forward trigger events to matching
|
||||
# pipeline event queues that are processed below.
|
||||
self.process_tenant_trigger_queue(tenant)
|
||||
try:
|
||||
with tenant_read_lock(
|
||||
self.zk_client, tenant.name, blocking=False
|
||||
):
|
||||
if not self._stopped:
|
||||
# This will forward trigger events to pipeline
|
||||
# event queues that are processed below.
|
||||
self.process_tenant_trigger_queue(tenant)
|
||||
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
self.process_pipeline_management_queue(
|
||||
tenant, pipeline)
|
||||
|
||||
# Give result events priority -- they let us stop
|
||||
# builds, whereas trigger events cause us to execute
|
||||
# builds.
|
||||
self.process_pipeline_result_queue(tenant, pipeline)
|
||||
|
||||
self.process_pipeline_trigger_queue(tenant, pipeline)
|
||||
try:
|
||||
while (pipeline.manager.processQueue() and
|
||||
not self._stopped):
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception in pipeline processing:")
|
||||
pipeline.state = pipeline.STATE_ERROR
|
||||
# Continue processing other pipelines+tenants
|
||||
else:
|
||||
pipeline.state = pipeline.STATE_NORMAL
|
||||
self.process_pipelines(tenant)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked tenant %s",
|
||||
tenant.name)
|
||||
except Exception:
|
||||
self.log.exception("Exception in run handler:")
|
||||
# There may still be more events to process
|
||||
|
@ -1380,6 +1374,36 @@ class Scheduler(threading.Thread):
|
|||
finally:
|
||||
self.run_handler_lock.release()
|
||||
|
||||
def process_pipelines(self, tenant):
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
with pipeline_lock(
|
||||
self.zk_client, tenant.name, pipeline.name, blocking=False
|
||||
):
|
||||
self._process_pipeline(tenant, pipeline)
|
||||
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked pipeline %s in tenant %s",
|
||||
pipeline.name, tenant.name)
|
||||
|
||||
def _process_pipeline(self, tenant, pipeline):
|
||||
self.process_pipeline_management_queue(tenant, pipeline)
|
||||
# Give result events priority -- they let us stop builds, whereas
|
||||
# trigger events cause us to execute builds.
|
||||
self.process_pipeline_result_queue(tenant, pipeline)
|
||||
self.process_pipeline_trigger_queue(tenant, pipeline)
|
||||
try:
|
||||
while not self._stopped and pipeline.manager.processQueue():
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception in pipeline processing:")
|
||||
pipeline.state = pipeline.STATE_ERROR
|
||||
# Continue processing other pipelines+tenants
|
||||
else:
|
||||
pipeline.state = pipeline.STATE_NORMAL
|
||||
|
||||
def maintainConnectionCache(self):
|
||||
# TODOv3(jeblair): update for tenants
|
||||
relevant = set()
|
||||
|
@ -1398,16 +1422,23 @@ class Scheduler(threading.Thread):
|
|||
self.log.debug("Connection cache size: %s" % len(relevant))
|
||||
|
||||
def process_tenant_trigger_queue(self, tenant):
|
||||
for event in self.trigger_events[tenant.name]:
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
log.debug("Forwarding trigger event %s", event)
|
||||
try:
|
||||
self._forward_trigger_event(event, tenant)
|
||||
except Exception:
|
||||
log.exception("Unable to forward event %s "
|
||||
"to tenant %s", event, tenant.name)
|
||||
finally:
|
||||
self.trigger_events[tenant.name].ack(event)
|
||||
try:
|
||||
with trigger_queue_lock(
|
||||
self.zk_client, tenant.name, blocking=False
|
||||
):
|
||||
for event in self.trigger_events[tenant.name]:
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
log.debug("Forwarding trigger event %s", event)
|
||||
try:
|
||||
self._forward_trigger_event(event, tenant)
|
||||
except Exception:
|
||||
log.exception("Unable to forward event %s "
|
||||
"to tenant %s", event, tenant.name)
|
||||
finally:
|
||||
self.trigger_events[tenant.name].ack(event)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked trigger event queue in tenant %s",
|
||||
tenant.name)
|
||||
|
||||
def _forward_trigger_event(self, event, tenant):
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
|
@ -1510,6 +1541,16 @@ class Scheduler(threading.Thread):
|
|||
pipeline.manager.addChange(change, event)
|
||||
|
||||
def process_tenant_management_queue(self, tenant):
|
||||
try:
|
||||
with management_queue_lock(
|
||||
self.zk_client, tenant.name, blocking=False
|
||||
):
|
||||
self._process_tenant_management_queue(tenant)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked management event queue"
|
||||
" in tenant %s", tenant.name)
|
||||
|
||||
def _process_tenant_management_queue(self, tenant):
|
||||
for event in self.management_events[tenant.name]:
|
||||
event_forwarded = False
|
||||
try:
|
||||
|
|
|
@ -26,36 +26,63 @@ TENANT_LOCK_ROOT = f"{LOCK_ROOT}/tenant"
|
|||
def locked(lock, blocking=True, timeout=None):
|
||||
if not lock.acquire(blocking=blocking, timeout=timeout):
|
||||
raise LockException(f"Failed to acquire lock {lock}")
|
||||
yield
|
||||
try:
|
||||
lock.release()
|
||||
except Exception:
|
||||
log = logging.getLogger("zuul.zk.locks")
|
||||
log.exception("Failed to release lock %s", lock)
|
||||
yield lock
|
||||
finally:
|
||||
try:
|
||||
lock.release()
|
||||
except Exception:
|
||||
log = logging.getLogger("zuul.zk.locks")
|
||||
log.exception("Failed to release lock %s", lock)
|
||||
|
||||
|
||||
def tenant_read_lock(client, tenant_name):
|
||||
@contextmanager
|
||||
def tenant_read_lock(client, tenant_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
return client.client.ReadLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}")
|
||||
with locked(
|
||||
client.client.ReadLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
|
||||
|
||||
def tenant_write_lock(client, tenant_name):
|
||||
@contextmanager
|
||||
def tenant_write_lock(client, tenant_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
return client.client.WriteLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}")
|
||||
with locked(
|
||||
client.client.WriteLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
|
||||
|
||||
def pipeline_lock(client, tenant_name, pipeline_name):
|
||||
@contextmanager
|
||||
def pipeline_lock(client, tenant_name, pipeline_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
safe_pipeline = quote_plus(pipeline_name)
|
||||
return client.client.Lock(
|
||||
f"/zuul/locks/pipeline/{safe_tenant}/{safe_pipeline}")
|
||||
with locked(
|
||||
client.client.Lock(
|
||||
f"/zuul/locks/pipeline/{safe_tenant}/{safe_pipeline}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
|
||||
|
||||
def management_queue_lock(client, tenant_name):
|
||||
@contextmanager
|
||||
def management_queue_lock(client, tenant_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
return client.client.Lock(f"/zuul/locks/events/management/{safe_tenant}")
|
||||
with locked(
|
||||
client.client.Lock(f"/zuul/locks/events/management/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
|
||||
|
||||
def trigger_queue_lock(client, tenant_name):
|
||||
@contextmanager
|
||||
def trigger_queue_lock(client, tenant_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
return client.client.Lock(f"/zuul/locks/events/trigger/{safe_tenant}")
|
||||
with locked(
|
||||
client.client.Lock(f"/zuul/locks/events/trigger/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
|
|
Loading…
Reference in New Issue