Refactor connection configuration

There are some arguments to the connection configuration method
which are used to exclude various types of connectinons since
not all Zuul components need (or should use) all of the connections.

These arguments are a bit haphazard due to the way they were added
over time, and we're adding more connection types that may not
always need to be initialized in the form of providers.

To make it easier to reason about what connections should be
initialized, the existing arguments are replaced with a set of
explicit requests for driver interfaces, all of which default to
False so that each component will state which things it needs.

Change-Id: I197764ec5ca8cccfc6f98ff5f7576d92a9f191fc
This commit is contained in:
James E. Blair
2024-07-11 14:50:57 -07:00
parent 55806d76b2
commit 12ddfb4ab3
9 changed files with 46 additions and 30 deletions

View File

@@ -1490,7 +1490,8 @@ class ZuulWebFixture(fixtures.Fixture):
config, test_config,
additional_event_queues, upstream_root,
poller_events, git_url_with_auth, add_cleanup)
self.connections.configure(config)
self.connections.configure(config, database=True, sources=True,
triggers=True, reporters=True)
self.authenticators = zuul.lib.auth.AuthenticatorRegistry()
self.authenticators.configure(config)
@@ -1889,7 +1890,9 @@ class SchedulerTestApp:
git_url_with_auth,
add_cleanup,
)
self.connections.configure(self.config)
self.connections.configure(self.config, database=True,
sources=True, triggers=True,
reporters=True, providers=True)
self.sched = TestScheduler(self.config, self.connections, self,
wait_for_init, disable_pipelines)
@@ -2261,8 +2264,7 @@ class ZuulTestCase(BaseTestCase):
self.additional_event_queues,
self.upstream_root, self.poller_events,
self.git_url_with_auth, self.addCleanup)
executor_connections.configure(self.config,
source_only=True)
executor_connections.configure(self.config, sources=True)
self.executor_api = TestingExecutorApi(self.zk_client)
self.merger_api = TestingMergerApi(self.zk_client)
self.executor_server = RecordingExecutorServer(

View File

@@ -100,8 +100,7 @@ class TestSchedulerZone(ZuulTestCase):
self.config, self.test_config, self.additional_event_queues,
self.upstream_root, self.poller_events,
self.git_url_with_auth, self.addCleanup)
executor_connections.configure(self.config,
source_only=True)
executor_connections.configure(self.config, sources=True)
self.executor_server_unzoned = RecordingExecutorServer(
config,
connections=executor_connections,

View File

@@ -211,11 +211,14 @@ class ZuulApp(object):
logging_config.setDebug()
logging_config.apply()
def configure_connections(self, source_only=False,
require_sql=False, check_bwrap=False):
def configure_connections(self, database=False, sources=False,
triggers=False, reporters=False,
providers=False, check_bwrap=False):
self.connections = zuul.lib.connections.ConnectionRegistry(
check_bwrap=check_bwrap)
self.connections.configure(self.config, source_only, require_sql)
self.connections.configure(self.config, database=database,
sources=sources, triggers=triggers,
reporters=reporters, providers=providers)
class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta):

View File

@@ -908,7 +908,7 @@ class Client(zuul.cmd.ZuulApp):
def validate(self):
from zuul import scheduler
from zuul import configloader
self.configure_connections(source_only=True)
self.configure_connections(sources=True, triggers=True, reporters=True)
class SchedulerConfig(scheduler.Scheduler):
# A custom scheduler constructor adapted for config check
@@ -1066,7 +1066,7 @@ class Client(zuul.cmd.ZuulApp):
args = self.args
now = datetime.datetime.now(dateutil.tz.tzutc())
cutoff = parse_cutoff(now, args.before, args.older_than)
self.configure_connections(source_only=False, require_sql=True)
self.configure_connections(database=True)
connection = self.connections.getSqlConnection()
connection.deleteBuildsets(cutoff, args.batch_size)
sys.exit(0)

View File

@@ -85,7 +85,7 @@ class Executor(zuul.cmd.ZuulDaemonApp):
self.setup_logging('executor', 'log_config')
self.log = logging.getLogger("zuul.Executor")
self.configure_connections(source_only=True, check_bwrap=True)
self.configure_connections(sources=True, check_bwrap=True)
if self.config.has_option('executor', 'job_dir'):
self.job_dir = os.path.expanduser(

View File

@@ -37,7 +37,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
def run(self):
self.handleCommands()
self.configure_connections(source_only=True)
self.configure_connections(sources=True)
self.setup_logging('merger', 'log_config')

View File

@@ -90,7 +90,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.setup_logging('scheduler', 'log_config')
self.log = logging.getLogger("zuul.Scheduler")
self.configure_connections(require_sql=True)
self.configure_connections(database=True, sources=True, triggers=True,
reporters=True, providers=True)
self.sched = zuul.scheduler.Scheduler(self.config,
self.connections, self,
self.args.wait_for_init,

View File

@@ -87,7 +87,8 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
self.log = logging.getLogger("zuul.WebServer")
try:
self.configure_connections(require_sql=True)
self.configure_connections(database=True, sources=True,
triggers=True, reporters=True)
self.configure_authenticators()
self._run()
except Exception:

View File

@@ -33,7 +33,12 @@ import zuul.driver.gitlab
import zuul.driver.elasticsearch
import zuul.driver.aws
from zuul.connection import BaseConnection
from zuul.driver import SourceInterface
from zuul.driver import (
ProviderInterface,
ReporterInterface,
SourceInterface,
TriggerInterface,
)
class DefaultConnection(BaseConnection):
@@ -91,11 +96,12 @@ class ConnectionRegistry(object):
for driver in self.drivers.values():
driver.stop()
def configure(self, config, source_only=False, require_sql=False):
def configure(self, config, database=False, sources=False,
triggers=False, reporters=False, providers=False):
# Register connections from the config
connections = OrderedDict()
if 'database' in config.sections() and not source_only:
if database and 'database' in config.sections():
driver = self.drivers['sql']
con_config = dict(config.items('database'))
@@ -121,14 +127,14 @@ class ConnectionRegistry(object):
driver = self.drivers[con_driver]
# The merger and the reporter only needs source driver.
# This makes sure Reporter like the SQLDriver are only created by
# the scheduler process
if source_only and not isinstance(driver, SourceInterface):
continue
connection = driver.getConnection(con_name, con_config)
connections[con_name] = connection
if (database and driver.name == 'sql' or
sources and isinstance(driver, SourceInterface) or
triggers and isinstance(driver, TriggerInterface) or
reporters and isinstance(driver, ReporterInterface) or
providers and isinstance(driver, ProviderInterface)):
# Only create connections for the requested drivers
connection = driver.getConnection(con_name, con_config)
connections[con_name] = connection
# If the [gerrit] or [smtp] sections still exist, load them in as a
# connection named 'gerrit' or 'smtp' respectfully
@@ -157,13 +163,17 @@ class ConnectionRegistry(object):
# Create default connections for drivers which need no
# connection information (e.g., 'timer' or 'zuul').
if not source_only:
for driver in self.drivers.values():
if not hasattr(driver, 'getConnection'):
for driver in self.drivers.values():
if not hasattr(driver, 'getConnection'):
if (database and driver.name == 'sql' or
sources and isinstance(driver, SourceInterface) or
triggers and isinstance(driver, TriggerInterface) or
reporters and isinstance(driver, ReporterInterface) or
providers and isinstance(driver, ProviderInterface)):
connections[driver.name] = DefaultConnection(
driver, driver.name, {})
if require_sql:
if database:
if 'database' not in connections:
raise Exception("Database configuration is required")