Merge "Log tenant read/write lock actions"
This commit is contained in:
commit
4cb2334d79
@ -50,7 +50,7 @@ class TestConfigLoader(ZuulTestCase):
|
||||
loader.loadAuthzRules(abide, unparsed_abide)
|
||||
|
||||
for tenant_name in unparsed_abide.tenants:
|
||||
tlock = tenant_read_lock(self.zk_client, tenant_name)
|
||||
tlock = tenant_read_lock(self.zk_client, tenant_name, self.log)
|
||||
# Consider all caches valid (min. ltime -1)
|
||||
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
|
||||
with tlock:
|
||||
|
@ -1047,7 +1047,7 @@ class Client(zuul.cmd.ZuulApp):
|
||||
safe_tenant = urllib.parse.quote_plus(args.tenant)
|
||||
safe_pipeline = urllib.parse.quote_plus(args.pipeline)
|
||||
COMPONENT_REGISTRY.create(zk_client)
|
||||
with tenant_read_lock(zk_client, args.tenant):
|
||||
with tenant_read_lock(zk_client, args.tenant, self.log):
|
||||
path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}'
|
||||
pipeline = Pipeline(args.tenant, args.pipeline)
|
||||
with pipeline_lock(
|
||||
|
@ -767,7 +767,7 @@ class Scheduler(threading.Thread):
|
||||
for tenant in self.abide.tenants.values():
|
||||
try:
|
||||
with tenant_read_lock(self.zk_client, tenant.name,
|
||||
blocking=False):
|
||||
self.log, blocking=False):
|
||||
if not self.isTenantLayoutUpToDate(tenant.name):
|
||||
self.log.debug(
|
||||
"Skipping leaked pipeline cleanup for tenant %s",
|
||||
@ -1034,11 +1034,13 @@ class Scheduler(threading.Thread):
|
||||
# There is no need to use the reconfig lock ID here as
|
||||
# we are starting from an empty layout state and there
|
||||
# should be no concurrent read locks.
|
||||
lock_ctx = tenant_write_lock(self.zk_client, tenant_name)
|
||||
lock_ctx = tenant_write_lock(self.zk_client,
|
||||
tenant_name, self.log)
|
||||
timer_ctx = self.statsd_timer(
|
||||
f'{stats_key}.reconfiguration_time')
|
||||
else:
|
||||
lock_ctx = tenant_read_lock(self.zk_client, tenant_name)
|
||||
lock_ctx = tenant_read_lock(self.zk_client,
|
||||
tenant_name, self.log)
|
||||
timer_ctx = nullcontext()
|
||||
|
||||
with lock_ctx as tlock, timer_ctx:
|
||||
@ -1284,7 +1286,7 @@ class Scheduler(threading.Thread):
|
||||
self.updateSystemConfig()
|
||||
|
||||
with tenant_read_lock(self.zk_client, tenant_name,
|
||||
blocking=False):
|
||||
log, blocking=False):
|
||||
remote_state = self.tenant_layout_state.get(
|
||||
tenant_name)
|
||||
local_state = self.local_layout_state.get(
|
||||
@ -1541,7 +1543,7 @@ class Scheduler(threading.Thread):
|
||||
|
||||
stats_key = f'zuul.tenant.{tenant_name}'
|
||||
with (tenant_write_lock(
|
||||
self.zk_client, tenant_name,
|
||||
self.zk_client, tenant_name, self.log,
|
||||
identifier=RECONFIG_LOCK_ID) as lock,
|
||||
self.statsd_timer(f'{stats_key}.reconfiguration_time')):
|
||||
tenant = loader.loadTenant(
|
||||
@ -1607,7 +1609,7 @@ class Scheduler(threading.Thread):
|
||||
with self.layout_lock[event.tenant_name]:
|
||||
old_tenant = self.abide.tenants.get(event.tenant_name)
|
||||
with (tenant_write_lock(
|
||||
self.zk_client, event.tenant_name,
|
||||
self.zk_client, event.tenant_name, self.log,
|
||||
identifier=RECONFIG_LOCK_ID) as lock,
|
||||
self.statsd_timer(f'{stats_key}.reconfiguration_time')):
|
||||
log.debug("Loading tenant %s", event.tenant_name)
|
||||
@ -2180,7 +2182,7 @@ class Scheduler(threading.Thread):
|
||||
|
||||
try:
|
||||
with tenant_read_lock(
|
||||
self.zk_client, tenant_name, blocking=False
|
||||
self.zk_client, tenant_name, self.log, blocking=False
|
||||
) as tlock:
|
||||
if not self.isTenantLayoutUpToDate(tenant_name):
|
||||
continue
|
||||
|
@ -2791,7 +2791,7 @@ class ZuulWeb(object):
|
||||
== self.tenant_layout_state.get(tenant_name)):
|
||||
return
|
||||
self.log.debug("Reloading tenant %s", tenant_name)
|
||||
with tenant_read_lock(self.zk_client, tenant_name):
|
||||
with tenant_read_lock(self.zk_client, tenant_name, self.log):
|
||||
layout_state = self.tenant_layout_state.get(tenant_name)
|
||||
layout_uuid = layout_state and layout_state.uuid
|
||||
|
||||
|
@ -122,20 +122,32 @@ def locked(lock, blocking=True, timeout=None):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def tenant_read_lock(client, tenant_name, blocking=True):
|
||||
def tenant_read_lock(client, tenant_name, log=None, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
if blocking and log:
|
||||
log.debug("Wait for %s read tenant lock", tenant_name)
|
||||
with locked(
|
||||
SessionAwareReadLock(
|
||||
client.client,
|
||||
f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
try:
|
||||
if log:
|
||||
log.debug("Aquired %s read tenant lock", tenant_name)
|
||||
yield lock
|
||||
finally:
|
||||
if log:
|
||||
log.debug("Released %s read tenant lock", tenant_name)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def tenant_write_lock(client, tenant_name, blocking=True, identifier=None):
|
||||
def tenant_write_lock(client, tenant_name, log=None, blocking=True,
|
||||
identifier=None):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
if blocking and log:
|
||||
log.debug("Wait for %s write tenant lock (id: %s)",
|
||||
tenant_name, identifier)
|
||||
with locked(
|
||||
SessionAwareWriteLock(
|
||||
client.client,
|
||||
@ -143,7 +155,15 @@ def tenant_write_lock(client, tenant_name, blocking=True, identifier=None):
|
||||
identifier=identifier),
|
||||
blocking=blocking,
|
||||
) as lock:
|
||||
yield lock
|
||||
try:
|
||||
if log:
|
||||
log.debug("Aquired %s write tenant lock (id: %s)",
|
||||
tenant_name, identifier)
|
||||
yield lock
|
||||
finally:
|
||||
if log:
|
||||
log.debug("Released %s write tenant lock (id: %s)",
|
||||
tenant_name, identifier)
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
Loading…
x
Reference in New Issue
Block a user