diff --git a/tests/base.py b/tests/base.py index da7b665385..233621f3d9 100644 --- a/tests/base.py +++ b/tests/base.py @@ -93,7 +93,7 @@ from zuul.driver.elasticsearch import ElasticsearchDriver from zuul.lib.collections import DefaultKeyDict from zuul.lib.connections import ConnectionRegistry from zuul.zk import zkobject, ZooKeeperClient -from zuul.zk.components import SchedulerComponent +from zuul.zk.components import SchedulerComponent, COMPONENT_REGISTRY from zuul.zk.event_queues import ConnectionEventQueue from zuul.zk.executor import ExecutorApi from zuul.zk.locks import tenant_read_lock, pipeline_lock, SessionAwareLock @@ -4032,6 +4032,15 @@ class PrometheusFixture(fixtures.Fixture): prometheus_client.registry.REGISTRY.unregister(collector) +class GlobalRegistryFixture(fixtures.Fixture): + def _setUp(self): + self.addCleanup(self._cleanup) + + def _cleanup(self): + # Remove our component registry from the global + COMPONENT_REGISTRY.clearRegistry() + + class FakeCPUTimes: def __init__(self): self.user = 0 @@ -4072,6 +4081,7 @@ class BaseTestCase(testtools.TestCase): def setUp(self): super(BaseTestCase, self).setUp() self.useFixture(PrometheusFixture()) + self.useFixture(GlobalRegistryFixture()) test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) try: test_timeout = int(test_timeout) @@ -4600,8 +4610,7 @@ class ZuulTestCase(BaseTestCase): self._context_lock.acquire(blocking=False) lock = self._context_lock return zkobject.ZKContext(self.zk_client, lock, - None, self.log, - self.scheds.first.sched.component_registry) + None, self.log) def __event_queues(self, matcher) -> List[Queue]: # TODO (swestphahl): Can be removed when we no longer use global diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 2a8db82dcb..a0d726e4b8 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -119,6 +119,7 @@ class TestSemaphoreModelUpgrade(ZuulTestCase): semaphore_holders = tenant.semaphore_handler.semaphoreHolders( "test-semaphore") + self.log.debug("Semaphore holders: %s", repr(semaphore_holders)) self.assertEqual(len(semaphore_holders), 1) # Assert that we are still using the old-style handler format self.assertTrue(all(isinstance(h, str) for h in semaphore_holders)) @@ -177,6 +178,7 @@ class TestSemaphoreModelUpgrade(ZuulTestCase): tenant.semaphore_handler.acquire(item, job, False) semaphore_holders = tenant.semaphore_handler.semaphoreHolders( "test-semaphore") + self.log.debug("Semaphore holders: %s", repr(semaphore_holders)) self.assertEqual(len(semaphore_holders), 1) # Assert that we are now using the new-style handler format self.assertTrue(all(isinstance(h, dict) for h in semaphore_holders)) diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index b3237072ec..9da8218cda 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -1505,8 +1505,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Create a new object tenant_name = 'fake_tenant' with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline1 = zkobject_class.new(context, name=tenant_name, foo='bar') @@ -1517,8 +1516,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Load an object from ZK (that we don't already have) with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline2 = zkobject_class.fromZK(context, '/zuul/pipeline/fake_tenant') self.assertEqual(pipeline2.foo, 'bar') @@ -1532,8 +1530,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Update an object with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) ltime1 = get_ltime(pipeline1) pipeline1.updateAttributes(context, foo='qux') self.assertEqual(pipeline1.foo, 'qux') @@ -1547,8 +1544,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Update an object using an active context with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) ltime1 = get_ltime(pipeline1) with pipeline1.activeContext(context): pipeline1.foo = 'baz' @@ -1569,15 +1565,13 @@ class TestZKObject(ZooKeeperBaseTestCase): # Refresh an existing object with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline2.refresh(context) self.assertEqual(pipeline2.foo, 'baz') # Delete an object with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) self.assertIsNotNone(self.zk_client.client.exists( '/zuul/pipeline/fake_tenant')) pipeline2.delete(context) @@ -1620,8 +1614,7 @@ class TestZKObject(ZooKeeperBaseTestCase): # Fail an update with tenant_write_lock(self.zk_client, tenant_name) as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline1 = zkobject_class.new(context, name=tenant_name, foo='one') @@ -1815,8 +1808,7 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase): # Create a new object with tenant_write_lock(self.zk_client, 'test') as lock: - context = ZKContext(self.zk_client, lock, stop_event, self.log, - self.component_registry) + context = ZKContext(self.zk_client, lock, stop_event, self.log) pipeline = DummyZKObject.new(context, name="test", foo="bar") e1 = model.ConfigurationError( source_context, start_mark, "Test error1") diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index dc8d4772c2..fc1ea6a2ea 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -37,7 +37,7 @@ from zuul.lib.keystorage import KeyStorage from zuul.zk.locks import tenant_write_lock from zuul.zk.zkobject import ZKContext from zuul.zk.layout import LayoutState, LayoutStateStore -from zuul.zk.components import ComponentRegistry +from zuul.zk.components import COMPONENT_REGISTRY # todo This should probably live somewhere else @@ -963,15 +963,14 @@ class Client(zuul.cmd.ZuulApp): args = self.args safe_tenant = urllib.parse.quote_plus(args.tenant) safe_pipeline = urllib.parse.quote_plus(args.pipeline) - component_registry = ComponentRegistry(zk_client) + COMPONENT_REGISTRY.create(zk_client) with tenant_write_lock(zk_client, args.tenant) as lock: path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}' layout_uuid = None zk_client.client.delete( f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}', recursive=True) - context = ZKContext(zk_client, lock, None, self.log, - component_registry) + context = ZKContext(zk_client, lock, None, self.log) ps = PipelineState.new(context, _path=path, layout_uuid=layout_uuid) # Force everyone to make a new layout for this tenant in diff --git a/zuul/configloader.py b/zuul/configloader.py index 7ed296d837..d5adf0542a 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -1640,8 +1640,7 @@ class TenantParser(object): tenant, parsed_config, loading_errors, layout_uuid) if self.scheduler: tenant.semaphore_handler = SemaphoreHandler( - self.zk_client, self.statsd, tenant.name, tenant.layout, - self.scheduler.component_registry + self.zk_client, self.statsd, tenant.name, tenant.layout ) return tenant diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 3330e96415..acb2e3cc71 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -72,7 +72,7 @@ import zuul.model from zuul.nodepool import Nodepool from zuul.version import get_version_string from zuul.zk.event_queues import PipelineResultEventQueue -from zuul.zk.components import ExecutorComponent, ComponentRegistry +from zuul.zk.components import ExecutorComponent, COMPONENT_REGISTRY from zuul.zk.exceptions import JobRequestNotFound from zuul.zk.executor import ExecutorApi from zuul.zk.job_request_queue import JobRequestEvent @@ -3195,9 +3195,8 @@ class ExecutorServer(BaseMergeServer): self.component_info = ExecutorComponent( self.zk_client, self.hostname, version=get_version_string()) self.component_info.register() - self.component_registry = ComponentRegistry(self.zk_client) - self.zk_context = ZKContext(self.zk_client, None, None, self.log, - self.component_registry) + COMPONENT_REGISTRY.create(self.zk_client) + self.zk_context = ZKContext(self.zk_client, None, None, self.log) self.monitoring_server = MonitoringServer(self.config, 'executor', self.component_info) self.monitoring_server.start() diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index dd2218b262..90eb032ea5 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -28,7 +28,7 @@ from zuul.lib.config import get_default from zuul.lib.monitoring import MonitoringServer from zuul.version import get_version_string from zuul.zk import ZooKeeperClient -from zuul.zk.components import ComponentRegistry, FingerGatewayComponent +from zuul.zk.components import COMPONENT_REGISTRY, FingerGatewayComponent from zuul.zk.executor import ExecutorApi COMMANDS = [ @@ -88,7 +88,7 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): try: build_uuid = self.getCommand() port_location = streamer_utils.getJobLogStreamAddress( - self.fingergw.executor_api, self.fingergw.component_registry, + self.fingergw.executor_api, build_uuid, source_zone=self.fingergw.zone) if not port_location: @@ -190,13 +190,12 @@ class FingerGateway(object): if self.tls_listen: self.component_info.use_ssl = True self.component_info.register() + COMPONENT_REGISTRY.create(self.zk_client) self.monitoring_server = MonitoringServer(config, 'fingergw', self.component_info) self.monitoring_server.start() - self.component_registry = ComponentRegistry(self.zk_client) - self.executor_api = ExecutorApi(self.zk_client, use_cache=False) def _runCommand(self): diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py index 1882dd64a1..5ebdcc6b86 100644 --- a/zuul/lib/streamer_utils.py +++ b/zuul/lib/streamer_utils.py @@ -30,6 +30,7 @@ import threading import time from zuul.exceptions import StreamingError +from zuul.zk.components import COMPONENT_REGISTRY log = logging.getLogger("zuul.lib.streamer_utils") @@ -175,8 +176,7 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): return sock, addr -def getJobLogStreamAddress(executor_api, component_registry, uuid, - source_zone): +def getJobLogStreamAddress(executor_api, uuid, source_zone): """ Looks up the log stream address for the given build UUID. @@ -199,7 +199,7 @@ def getJobLogStreamAddress(executor_api, component_registry, uuid, job_log_stream_address = {} if worker_zone and source_zone != worker_zone: - info = _getFingerGatewayInZone(component_registry, worker_zone) + info = _getFingerGatewayInZone(worker_zone) if info: job_log_stream_address['server'] = info.hostname job_log_stream_address['port'] = info.public_port @@ -225,8 +225,9 @@ def getJobLogStreamAddress(executor_api, component_registry, uuid, return job_log_stream_address -def _getFingerGatewayInZone(component_registry, zone): - gws = [gw for gw in component_registry.all('fingergw') if gw.zone == zone] +def _getFingerGatewayInZone(zone): + registry = COMPONENT_REGISTRY.registry + gws = [gw for gw in registry.all('fingergw') if gw.zone == zone] if gws: return random.choice(gws) return None diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index b0677e4c33..070065ba1f 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -26,6 +26,7 @@ from zuul.model import ( Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem, ) from zuul.zk.change_cache import ChangeKey +from zuul.zk.components import COMPONENT_REGISTRY from zuul.zk.locks import pipeline_lock @@ -731,7 +732,7 @@ class PipelineManager(metaclass=ABCMeta): continue # MODEL_API: >2 - if self.sched.component_registry.model_api > 2: + if COMPONENT_REGISTRY.model_api > 2: event = model.SupercedeEvent( other_pipeline.tenant.name, other_pipeline.name, diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e7cd3c0eff..7f204616aa 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -81,7 +81,7 @@ from zuul.zk.cleanup import ( NodeRequestCleanupLock, ) from zuul.zk.components import ( - BaseComponent, ComponentRegistry, SchedulerComponent + BaseComponent, COMPONENT_REGISTRY, SchedulerComponent ) from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache from zuul.zk.event_queues import ( @@ -236,7 +236,7 @@ class Scheduler(threading.Thread): self.component_info = SchedulerComponent( self.zk_client, self.hostname, version=self.zuul_version) self.component_info.register() - self.component_registry = ComponentRegistry(self.zk_client) + self.component_registry = COMPONENT_REGISTRY.create(self.zk_client) self.system_config_cache = SystemConfigCache(self.zk_client, self.wake_event.set) self.unparsed_config_cache = UnparsedConfigCache(self.zk_client) @@ -2538,5 +2538,4 @@ class Scheduler(threading.Thread): tenant.semaphore_handler.release(item, job) def createZKContext(self, lock, log): - return ZKContext(self.zk_client, lock, self.stop_event, log, - self.component_registry) + return ZKContext(self.zk_client, lock, self.stop_event, log) diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index bb69480e7f..0b7facd9f6 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -59,7 +59,7 @@ from zuul.model import ( ) from zuul.version import get_version_string from zuul.zk import ZooKeeperClient -from zuul.zk.components import ComponentRegistry, WebComponent +from zuul.zk.components import COMPONENT_REGISTRY, WebComponent from zuul.zk.config_cache import SystemConfigCache from zuul.zk.event_queues import ( TenantManagementEventQueue, @@ -269,7 +269,7 @@ class LogStreamHandler(WebSocket): try: port_location = streamer_utils.getJobLogStreamAddress( - self.zuulweb.executor_api, self.zuulweb.component_registry, + self.zuulweb.executor_api, request['uuid'], source_zone=self.zuulweb.zone) except exceptions.StreamingError as e: return self.logClose(4011, str(e)) @@ -1723,7 +1723,7 @@ class ZuulWeb(object): self.component_info) self.monitoring_server.start() - self.component_registry = ComponentRegistry(self.zk_client) + self.component_registry = COMPONENT_REGISTRY.create(self.zk_client) self.system_config_cache_wake_event = threading.Event() self.system_config_cache = SystemConfigCache( @@ -1763,8 +1763,7 @@ class ZuulWeb(object): self.zk_client ) - self.zk_context = ZKContext(self.zk_client, None, None, self.log, - self.component_registry) + self.zk_context = ZKContext(self.zk_client, None, None, self.log) command_socket = get_default( self.config, 'web', 'command_socket', diff --git a/zuul/zk/branch_cache.py b/zuul/zk/branch_cache.py index 950d470a8e..56930093b8 100644 --- a/zuul/zk/branch_cache.py +++ b/zuul/zk/branch_cache.py @@ -94,8 +94,7 @@ class BranchCache: # TODO: standardize on a stop event for connections and add it # to the context. - self.zk_context = ZKContext(zk_client, self.wlock, None, self.log, - component_registry) + self.zk_context = ZKContext(zk_client, self.wlock, None, self.log) with locked(self.wlock): try: diff --git a/zuul/zk/components.py b/zuul/zk/components.py index 94bf416c3e..6554297e15 100644 --- a/zuul/zk/components.py +++ b/zuul/zk/components.py @@ -27,6 +27,26 @@ from zuul import model COMPONENTS_ROOT = "/zuul/components" +class GlobalRegistry: + def __init__(self): + self.registry = None + + def create(self, zk_client): + if not self.registry: + self.registry = ComponentRegistry(zk_client) + return self.registry + + def clearRegistry(self): + self.registry = None + + @property + def model_api(self): + return self.registry.model_api + + +COMPONENT_REGISTRY = GlobalRegistry() + + class BaseComponent(ZooKeeperBase): """ Read/write component object. diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 9e72cf8963..04a4f256ea 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -21,6 +21,7 @@ from kazoo.exceptions import BadVersionError, NoNodeError from zuul.lib.logutil import get_annotated_logger from zuul.zk import ZooKeeperSimpleBase +from zuul.zk.components import COMPONENT_REGISTRY def holdersFromData(data): @@ -38,12 +39,10 @@ class SemaphoreHandler(ZooKeeperSimpleBase): semaphore_root = "/zuul/semaphores" - def __init__(self, client, statsd, tenant_name, layout, - component_registry): + def __init__(self, client, statsd, tenant_name, layout): super().__init__(client) self.layout = layout self.statsd = statsd - self.component_registry = component_registry self.tenant_name = tenant_name self.tenant_root = f"{self.semaphore_root}/{tenant_name}" @@ -110,7 +109,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase): # semaphore is there, check max while len(semaphore_holders) < self._max_count(semaphore.name): # MODEL_API: >1 - if self.component_registry.model_api > 1: + if COMPONENT_REGISTRY.model_api > 1: semaphore_holders.append(semaphore_handle) else: semaphore_holders.append(legacy_handle) @@ -212,10 +211,10 @@ class SemaphoreHandler(ZooKeeperSimpleBase): def cleanupLeaks(self): # MODEL_API: >1 - if self.component_registry.model_api < 2: + if COMPONENT_REGISTRY.model_api < 2: self.log.warning("Skipping semaphore cleanup since minimum model " "API is %s (needs >= 2)", - self.component_registry.model_api) + COMPONENT_REGISTRY.model_api) return for semaphore_name in self.getSemaphores(): diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 92b0cfcee5..45eb2b8e22 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -23,25 +23,19 @@ from kazoo.exceptions import ( from zuul.zk import sharding from zuul.zk.exceptions import InvalidObjectError -from zuul import model class ZKContext: - def __init__(self, zk_client, lock, stop_event, log, registry): + def __init__(self, zk_client, lock, stop_event, log): self.client = zk_client.client self.lock = lock self.stop_event = stop_event self.log = log - self.registry = registry def sessionIsValid(self): return ((not self.lock or self.lock.is_still_valid()) and (not self.stop_event or not self.stop_event.is_set())) - @property - def model_api(self): - return self.registry.model_api - class LocalZKContext: """A Local ZKContext that means don't actually write anything to ZK""" @@ -51,15 +45,10 @@ class LocalZKContext: self.lock = None self.stop_event = None self.log = log - self.registry = None def sessionIsValid(self): return True - @property - def model_api(self): - return model.MODEL_API - class ZKObject: _retry_interval = 5