From 864a2b7701f47c82dfda218538fd58970896ec2a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 8 Feb 2022 13:27:06 -0800 Subject: [PATCH] Make a global component registry We generally try to avoid global variables, but in this case, it may be helpful to set the component registry as a global variable. We need the component registry to determine the ZK data model API version. It's relatively straightforward to pass it through the zkcontext for zkobjects, but we also may need it in other places where we might alter processing of data we previously got from zk (eg, the semaphore cleanup). Or we might need it in serialize or deserialize methods of non-zkobjects (for example, ChangeKey). To account for all potential future uses, instantiate a global singleton object which holds a registry and use that instead of local-scoped component registry objects. We also add a clear method so that we can be sure unit tests start with clean data. Change-Id: Ib764dbc3a3fe39ad6d70d4807b8035777d727d93 --- tests/base.py | 15 ++++++++++++--- tests/unit/test_model_upgrade.py | 2 ++ tests/unit/test_zk.py | 24 ++++++++---------------- zuul/cmd/client.py | 7 +++---- zuul/configloader.py | 3 +-- zuul/executor/server.py | 7 +++---- zuul/lib/fingergw.py | 7 +++---- zuul/lib/streamer_utils.py | 11 ++++++----- zuul/manager/__init__.py | 3 ++- zuul/scheduler.py | 7 +++---- zuul/web/__init__.py | 9 ++++----- zuul/zk/branch_cache.py | 3 +-- zuul/zk/components.py | 20 ++++++++++++++++++++ zuul/zk/semaphore.py | 11 +++++------ zuul/zk/zkobject.py | 13 +------------ 15 files changed, 74 insertions(+), 68 deletions(-) diff --git a/tests/base.py b/tests/base.py index da7b665385..233621f3d9 100644 --- a/tests/base.py +++ b/tests/base.py @@ -93,7 +93,7 @@ from zuul.driver.elasticsearch import ElasticsearchDriver from zuul.lib.collections import DefaultKeyDict from zuul.lib.connections import ConnectionRegistry from zuul.zk import zkobject, ZooKeeperClient -from zuul.zk.components import SchedulerComponent +from zuul.zk.components import SchedulerComponent, COMPONENT_REGISTRY from zuul.zk.event_queues import ConnectionEventQueue from zuul.zk.executor import ExecutorApi from zuul.zk.locks import tenant_read_lock, pipeline_lock, SessionAwareLock @@ -4032,6 +4032,15 @@ class PrometheusFixture(fixtures.Fixture): prometheus_client.registry.REGISTRY.unregister(collector) +class GlobalRegistryFixture(fixtures.Fixture): + def _setUp(self): + self.addCleanup(self._cleanup) + + def _cleanup(self): + # Remove our component registry from the global + COMPONENT_REGISTRY.clearRegistry() + + class FakeCPUTimes: def __init__(self): self.user = 0 @@ -4072,6 +4081,7 @@ class BaseTestCase(testtools.TestCase): def setUp(self): super(BaseTestCase, self).setUp() self.useFixture(PrometheusFixture()) + self.useFixture(GlobalRegistryFixture()) test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) try: test_timeout = int(test_timeout) @@ -4600,8 +4610,7 @@ class ZuulTestCase(BaseTestCase): self._context_lock.acquire(blocking=False) lock = self._context_lock return zkobject.ZKContext(self.zk_client, lock, - None, self.log, - self.scheds.first.sched.component_registry) + None, self.log) def __event_queues(self, matcher) -> List[Queue]: # TODO (swestphahl): Can be removed when we no longer use global diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 2a8db82dcb..a0d726e4b8 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -119,6 +119,7 @@ class TestSemaphoreModelUpgrade(ZuulTestCase): semaphore_holders = tenant.semaphore_handler.semaphoreHolders( "test-semaphore") + self.log.debug("Semaphore holders: %s", repr(semaphore_holders)) self.assertEqual(len(semaphore_holders), 1) # Assert that we are still using the old-style handler format self.assertTrue(all(isinstance(h, str) for h in semaphore_holders)) @@ -177,6 +178,7 @@ class TestSemaphoreModelUpgrade(ZuulTestCase): tenant.semaphore_handler.acquire(item, job, False) semaphore_holders = tenant.semaphore_handler.semaphoreHolders( "test-semaphore") + self.log.debug("Semaphore holders: %s", repr(semaphore_holders)) self.assertEqual(len(semaphore_holders), 1) # Assert that we are now using the new-style handler format self.assertTrue(all(isinstance(h, dict) for h in semaphore_holders)) diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index b3237072ec..9da8218cda 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -1505,8 +1505,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Create a new object tenant_name = 'fake_tenant' with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline1 = zkobject_class.new(context, name=tenant_name, foo='bar') @@ -1517,8 +1516,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Load an object from ZK (that we don't already have) with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline2 = zkobject_class.fromZK(context, '/zuul/pipeline/fake_tenant') self.assertEqual(pipeline2.foo, 'bar') @@ -1532,8 +1530,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Update an object with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) ltime1 = get_ltime(pipeline1) pipeline1.updateAttributes(context, foo='qux') self.assertEqual(pipeline1.foo, 'qux') @@ -1547,8 +1544,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Update an object using an active context with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) ltime1 = get_ltime(pipeline1) with pipeline1.activeContext(context): pipeline1.foo = 'baz' @@ -1569,15 +1565,13 @@ class TestZKObject(ZooKeeperBaseTestCase): # Refresh an existing object with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline2.refresh(context) self.assertEqual(pipeline2.foo, 'baz') # Delete an object with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) self.assertIsNotNone(self.zk_client.client.exists( '/zuul/pipeline/fake_tenant')) pipeline2.delete(context) @@ -1620,8 +1614,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Fail an update with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline1 = zkobject_class.new(context, name=tenant_name, foo='one') @@ -1815,8 +1808,7 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase): # Create a new object with tenant_write_lock(self.zk_client, 'test') as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline = DummyZKObject.new(context, name="test", foo="bar") e1 = model.ConfigurationError( source_context, start_mark, "Test error1") diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index dc8d4772c2..fc1ea6a2ea 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -37,7 +37,7 @@ from zuul.lib.keystorage import KeyStorage from zuul.zk.locks import tenant_write_lock from zuul.zk.zkobject import ZKContext from zuul.zk.layout import LayoutState, LayoutStateStore -from zuul.zk.components import ComponentRegistry +from zuul.zk.components import COMPONENT_REGISTRY # todo This should probably live somewhere else @@ -963,15 +963,14 @@ class Client(zuul.cmd.ZuulApp): args = self.args safe_tenant = urllib.parse.quote_plus(args.tenant) safe_pipeline = urllib.parse.quote_plus(args.pipeline) - component_registry = ComponentRegistry(zk_client) + COMPONENT_REGISTRY.create(zk_client) with tenant_write_lock(zk_client, args.tenant) as lock: path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}' layout_uuid = None zk_client.client.delete( f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}', recursive=True) - context = ZKContext(zk_client, lock, None, self.log, - component_registry) + context = ZKContext(zk_client, lock, None, self.log) ps = PipelineState.new(context, _path=path, layout_uuid=layout_uuid) # Force everyone to make a new layout for this tenant in diff --git a/zuul/configloader.py b/zuul/configloader.py index 7ed296d837..d5adf0542a 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -1640,8 +1640,7 @@ class TenantParser(object): tenant, parsed_config, loading_errors, layout_uuid) if self.scheduler: tenant.semaphore_handler = SemaphoreHandler( - self.zk_client, self.statsd, tenant.name, tenant.layout, - self.scheduler.component_registry + self.zk_client, self.statsd, tenant.name, tenant.layout ) return tenant diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 3330e96415..acb2e3cc71 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -72,7 +72,7 @@ import zuul.model from zuul.nodepool import Nodepool from zuul.version import get_version_string from zuul.zk.event_queues import PipelineResultEventQueue -from zuul.zk.components import ExecutorComponent, ComponentRegistry +from zuul.zk.components import ExecutorComponent, COMPONENT_REGISTRY from zuul.zk.exceptions import JobRequestNotFound from zuul.zk.executor import ExecutorApi from zuul.zk.job_request_queue import JobRequestEvent @@ -3195,9 +3195,8 @@ class ExecutorServer(BaseMergeServer): self.component_info = ExecutorComponent( self.zk_client, self.hostname, version=get_version_string()) self.component_info.register() - self.component_registry = ComponentRegistry(self.zk_client) - self.zk_context = ZKContext(self.zk_client, None, None, self.log, - self.component_registry) + COMPONENT_REGISTRY.create(self.zk_client) + self.zk_context = ZKContext(self.zk_client, None, None, self.log) self.monitoring_server = MonitoringServer(self.config, 'executor', self.component_info) self.monitoring_server.start() diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index dd2218b262..90eb032ea5 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -28,7 +28,7 @@ from zuul.lib.config import get_default from zuul.lib.monitoring import MonitoringServer from zuul.version import get_version_string from zuul.zk import ZooKeeperClient -from zuul.zk.components import ComponentRegistry, FingerGatewayComponent +from zuul.zk.components import COMPONENT_REGISTRY, FingerGatewayComponent from zuul.zk.executor import ExecutorApi COMMANDS = [ @@ -88,7 +88,7 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): try: build_uuid = self.getCommand() port_location = streamer_utils.getJobLogStreamAddress( - self.fingergw.executor_api, self.fingergw.component_registry, + self.fingergw.executor_api, build_uuid, source_zone=self.fingergw.zone) if not port_location: @@ -190,13 +190,12 @@ class FingerGateway(object): if self.tls_listen: self.component_info.use_ssl = True self.component_info.register() + COMPONENT_REGISTRY.create(self.zk_client) self.monitoring_server = MonitoringServer(config, 'fingergw', self.component_info) self.monitoring_server.start() - self.component_registry = ComponentRegistry(self.zk_client) - self.executor_api = ExecutorApi(self.zk_client, use_cache=False) def _runCommand(self): diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py index 1882dd64a1..5ebdcc6b86 100644 --- a/zuul/lib/streamer_utils.py +++ b/zuul/lib/streamer_utils.py @@ -30,6 +30,7 @@ import threading import time from zuul.exceptions import StreamingError +from zuul.zk.components import COMPONENT_REGISTRY log = logging.getLogger("zuul.lib.streamer_utils") @@ -175,8 +176,7 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): return sock, addr -def getJobLogStreamAddress(executor_api, component_registry, uuid, - source_zone): +def getJobLogStreamAddress(executor_api, uuid, source_zone): """ Looks up the log stream address for the given build UUID. @@ -199,7 +199,7 @@ def getJobLogStreamAddress(executor_api, component_registry, uuid, job_log_stream_address = {} if worker_zone and source_zone != worker_zone: - info = _getFingerGatewayInZone(component_registry, worker_zone) + info = _getFingerGatewayInZone(worker_zone) if info: job_log_stream_address['server'] = info.hostname job_log_stream_address['port'] = info.public_port @@ -225,8 +225,9 @@ def getJobLogStreamAddress(executor_api, component_registry, uuid, return job_log_stream_address -def _getFingerGatewayInZone(component_registry, zone): - gws = [gw for gw in component_registry.all('fingergw') if gw.zone == zone] +def _getFingerGatewayInZone(zone): + registry = COMPONENT_REGISTRY.registry + gws = [gw for gw in registry.all('fingergw') if gw.zone == zone] if gws: return random.choice(gws) return None diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index b0677e4c33..070065ba1f 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -26,6 +26,7 @@ from zuul.model import ( Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem, ) from zuul.zk.change_cache import ChangeKey +from zuul.zk.components import COMPONENT_REGISTRY from zuul.zk.locks import pipeline_lock @@ -731,7 +732,7 @@ class PipelineManager(metaclass=ABCMeta): continue # MODEL_API: >2 - if self.sched.component_registry.model_api > 2: + if COMPONENT_REGISTRY.model_api > 2: event = model.SupercedeEvent( other_pipeline.tenant.name, other_pipeline.name, diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e7cd3c0eff..7f204616aa 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -81,7 +81,7 @@ from zuul.zk.cleanup import ( NodeRequestCleanupLock, ) from zuul.zk.components import ( - BaseComponent, ComponentRegistry, SchedulerComponent + BaseComponent, COMPONENT_REGISTRY, SchedulerComponent ) from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache from zuul.zk.event_queues import ( @@ -236,7 +236,7 @@ class Scheduler(threading.Thread): self.component_info = SchedulerComponent( self.zk_client, self.hostname, version=self.zuul_version) self.component_info.register() - self.component_registry = ComponentRegistry(self.zk_client) + self.component_registry = COMPONENT_REGISTRY.create(self.zk_client) self.system_config_cache = SystemConfigCache(self.zk_client, self.wake_event.set) self.unparsed_config_cache = UnparsedConfigCache(self.zk_client) @@ -2538,5 +2538,4 @@ class Scheduler(threading.Thread): tenant.semaphore_handler.release(item, job) def createZKContext(self, lock, log): - return ZKContext(self.zk_client, lock, self.stop_event, log, - self.component_registry) + return ZKContext(self.zk_client, lock, self.stop_event, log) diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index bb69480e7f..0b7facd9f6 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -59,7 +59,7 @@ from zuul.model import ( ) from zuul.version import get_version_string from zuul.zk import ZooKeeperClient -from zuul.zk.components import ComponentRegistry, WebComponent +from zuul.zk.components import COMPONENT_REGISTRY, WebComponent from zuul.zk.config_cache import SystemConfigCache from zuul.zk.event_queues import ( TenantManagementEventQueue, @@ -269,7 +269,7 @@ class LogStreamHandler(WebSocket): try: port_location = streamer_utils.getJobLogStreamAddress( - self.zuulweb.executor_api, self.zuulweb.component_registry, + self.zuulweb.executor_api, request['uuid'], source_zone=self.zuulweb.zone) except exceptions.StreamingError as e: return self.logClose(4011, str(e)) @@ -1723,7 +1723,7 @@ class ZuulWeb(object): self.component_info) self.monitoring_server.start() - self.component_registry = ComponentRegistry(self.zk_client) + self.component_registry = COMPONENT_REGISTRY.create(self.zk_client) self.system_config_cache_wake_event = threading.Event() self.system_config_cache = SystemConfigCache( @@ -1763,8 +1763,7 @@ class ZuulWeb(object): self.zk_client ) - self.zk_context = ZKContext(self.zk_client, None, None, self.log, - self.component_registry) + self.zk_context = ZKContext(self.zk_client, None, None, self.log) command_socket = get_default( self.config, 'web', 'command_socket', diff --git a/zuul/zk/branch_cache.py b/zuul/zk/branch_cache.py index 950d470a8e..56930093b8 100644 --- a/zuul/zk/branch_cache.py +++ b/zuul/zk/branch_cache.py @@ -94,8 +94,7 @@ class BranchCache: # TODO: standardize on a stop event for connections and add it # to the context. - self.zk_context = ZKContext(zk_client, self.wlock, None, self.log, - component_registry) + self.zk_context = ZKContext(zk_client, self.wlock, None, self.log) with locked(self.wlock): try: diff --git a/zuul/zk/components.py b/zuul/zk/components.py index 94bf416c3e..6554297e15 100644 --- a/zuul/zk/components.py +++ b/zuul/zk/components.py @@ -27,6 +27,26 @@ from zuul import model COMPONENTS_ROOT = "/zuul/components" +class GlobalRegistry: + def __init__(self): + self.registry = None + + def create(self, zk_client): + if not self.registry: + self.registry = ComponentRegistry(zk_client) + return self.registry + + def clearRegistry(self): + self.registry = None + + @property + def model_api(self): + return self.registry.model_api + + +COMPONENT_REGISTRY = GlobalRegistry() + + class BaseComponent(ZooKeeperBase): """ Read/write component object. diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 9e72cf8963..04a4f256ea 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -21,6 +21,7 @@ from kazoo.exceptions import BadVersionError, NoNodeError from zuul.lib.logutil import get_annotated_logger from zuul.zk import ZooKeeperSimpleBase +from zuul.zk.components import COMPONENT_REGISTRY def holdersFromData(data): @@ -38,12 +39,10 @@ class SemaphoreHandler(ZooKeeperSimpleBase): semaphore_root = "/zuul/semaphores" - def __init__(self, client, statsd, tenant_name, layout, - component_registry): + def __init__(self, client, statsd, tenant_name, layout): super().__init__(client) self.layout = layout self.statsd = statsd - self.component_registry = component_registry self.tenant_name = tenant_name self.tenant_root = f"{self.semaphore_root}/{tenant_name}" @@ -110,7 +109,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase): # semaphore is there, check max while len(semaphore_holders) < self._max_count(semaphore.name): # MODEL_API: >1 - if self.component_registry.model_api > 1: + if COMPONENT_REGISTRY.model_api > 1: semaphore_holders.append(semaphore_handle) else: semaphore_holders.append(legacy_handle) @@ -212,10 +211,10 @@ class SemaphoreHandler(ZooKeeperSimpleBase): def cleanupLeaks(self): # MODEL_API: >1 - if self.component_registry.model_api < 2: + if COMPONENT_REGISTRY.model_api < 2: self.log.warning("Skipping semaphore cleanup since minimum model " "API is %s (needs >= 2)", - self.component_registry.model_api) + COMPONENT_REGISTRY.model_api) return for semaphore_name in self.getSemaphores(): diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 92b0cfcee5..45eb2b8e22 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -23,25 +23,19 @@ from kazoo.exceptions import ( from zuul.zk import sharding from zuul.zk.exceptions import InvalidObjectError -from zuul import model class ZKContext: - def __init__(self, zk_client, lock, stop_event, log, registry): + def __init__(self, zk_client, lock, stop_event, log): self.client = zk_client.client self.lock = lock self.stop_event = stop_event self.log = log - self.registry = registry def sessionIsValid(self): return ((not self.lock or self.lock.is_still_valid()) and (not self.stop_event or not self.stop_event.is_set())) - @property - def model_api(self): - return self.registry.model_api - class LocalZKContext: """A Local ZKContext that means don't actually write anything to ZK""" @@ -51,15 +45,10 @@ class LocalZKContext: self.lock = None self.stop_event = None self.log = log - self.registry = None def sessionIsValid(self): return True - @property - def model_api(self): - return model.MODEL_API - class ZKObject: _retry_interval = 5