Merge "Reorganize connections into drivers" into feature/zuulv3
This commit is contained in:
commit
9672473d87
|
@ -45,8 +45,8 @@ import statsd
|
|||
import testtools
|
||||
from git.exc import NoSuchPathError
|
||||
|
||||
import zuul.connection.gerrit
|
||||
import zuul.connection.smtp
|
||||
import zuul.driver.gerrit.gerritsource as gerritsource
|
||||
import zuul.driver.gerrit.gerritconnection as gerritconnection
|
||||
import zuul.scheduler
|
||||
import zuul.webapp
|
||||
import zuul.rpclistener
|
||||
|
@ -58,12 +58,6 @@ import zuul.merger.client
|
|||
import zuul.merger.merger
|
||||
import zuul.merger.server
|
||||
import zuul.nodepool
|
||||
import zuul.reporter.gerrit
|
||||
import zuul.reporter.smtp
|
||||
import zuul.source.gerrit
|
||||
import zuul.trigger.gerrit
|
||||
import zuul.trigger.timer
|
||||
import zuul.trigger.zuultrigger
|
||||
import zuul.zk
|
||||
|
||||
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
|
||||
|
@ -388,7 +382,7 @@ class FakeChange(object):
|
|||
self.reported += 1
|
||||
|
||||
|
||||
class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
|
||||
class FakeGerritConnection(gerritconnection.GerritConnection):
|
||||
"""A Fake Gerrit connection for use in tests.
|
||||
|
||||
This subclasses
|
||||
|
@ -398,9 +392,9 @@ class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
|
|||
|
||||
log = logging.getLogger("zuul.test.FakeGerritConnection")
|
||||
|
||||
def __init__(self, connection_name, connection_config,
|
||||
def __init__(self, driver, connection_name, connection_config,
|
||||
changes_db=None, upstream_root=None):
|
||||
super(FakeGerritConnection, self).__init__(connection_name,
|
||||
super(FakeGerritConnection, self).__init__(driver, connection_name,
|
||||
connection_config)
|
||||
|
||||
self.event_queue = Queue.Queue()
|
||||
|
@ -1225,14 +1219,15 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
self.config.set('gearman', 'port', str(self.gearman_server.port))
|
||||
|
||||
zuul.source.gerrit.GerritSource.replication_timeout = 1.5
|
||||
zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
|
||||
zuul.connection.gerrit.GerritEventConnector.delay = 0.0
|
||||
gerritsource.GerritSource.replication_timeout = 1.5
|
||||
gerritsource.GerritSource.replication_retry_interval = 0.5
|
||||
gerritconnection.GerritEventConnector.delay = 0.0
|
||||
|
||||
self.sched = zuul.scheduler.Scheduler(self.config)
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
|
||||
FakeSwiftClientConnection))
|
||||
|
||||
self.swift = zuul.lib.swift.Swift(self.config)
|
||||
|
||||
self.event_queues = [
|
||||
|
@ -1294,7 +1289,25 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.assertFinalState()
|
||||
|
||||
def configure_connections(self):
|
||||
# Register connections from the config
|
||||
# Set up gerrit related fakes
|
||||
# Set a changes database so multiple FakeGerrit's can report back to
|
||||
# a virtual canonical database given by the configured hostname
|
||||
self.gerrit_changes_dbs = {}
|
||||
|
||||
def getGerritConnection(driver, name, config):
|
||||
db = self.gerrit_changes_dbs.setdefault(config['server'], {})
|
||||
con = FakeGerritConnection(driver, name, config,
|
||||
changes_db=db,
|
||||
upstream_root=self.upstream_root)
|
||||
self.event_queues.append(con.event_queue)
|
||||
setattr(self, 'fake_' + name, con)
|
||||
return con
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.driver.gerrit.GerritDriver.getConnection',
|
||||
getGerritConnection))
|
||||
|
||||
# Set up smtp related fakes
|
||||
self.smtp_messages = []
|
||||
|
||||
def FakeSMTPFactory(*args, **kw):
|
||||
|
@ -1303,60 +1316,9 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
|
||||
|
||||
# Set a changes database so multiple FakeGerrit's can report back to
|
||||
# a virtual canonical database given by the configured hostname
|
||||
self.gerrit_changes_dbs = {}
|
||||
# Register connections from the config using fakes
|
||||
self.connections = zuul.lib.connections.ConnectionRegistry()
|
||||
|
||||
for section_name in self.config.sections():
|
||||
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
|
||||
section_name, re.I)
|
||||
if not con_match:
|
||||
continue
|
||||
con_name = con_match.group(2)
|
||||
con_config = dict(self.config.items(section_name))
|
||||
|
||||
if 'driver' not in con_config:
|
||||
raise Exception("No driver specified for connection %s."
|
||||
% con_name)
|
||||
|
||||
con_driver = con_config['driver']
|
||||
|
||||
# TODO(jhesketh): load the required class automatically
|
||||
if con_driver == 'gerrit':
|
||||
if con_config['server'] not in self.gerrit_changes_dbs.keys():
|
||||
self.gerrit_changes_dbs[con_config['server']] = {}
|
||||
self.connections.connections[con_name] = FakeGerritConnection(
|
||||
con_name, con_config,
|
||||
changes_db=self.gerrit_changes_dbs[con_config['server']],
|
||||
upstream_root=self.upstream_root
|
||||
)
|
||||
self.event_queues.append(
|
||||
self.connections.connections[con_name].event_queue)
|
||||
setattr(self, 'fake_' + con_name,
|
||||
self.connections.connections[con_name])
|
||||
elif con_driver == 'smtp':
|
||||
self.connections.connections[con_name] = \
|
||||
zuul.connection.smtp.SMTPConnection(con_name, con_config)
|
||||
else:
|
||||
raise Exception("Unknown driver, %s, for connection %s"
|
||||
% (con_config['driver'], con_name))
|
||||
|
||||
# If the [gerrit] or [smtp] sections still exist, load them in as a
|
||||
# connection named 'gerrit' or 'smtp' respectfully
|
||||
|
||||
if 'gerrit' in self.config.sections():
|
||||
self.gerrit_changes_dbs['gerrit'] = {}
|
||||
self.event_queues.append(
|
||||
self.connections.connections[con_name].event_queue)
|
||||
self.connections.connections['gerrit'] = FakeGerritConnection(
|
||||
'_legacy_gerrit', dict(self.config.items('gerrit')),
|
||||
changes_db=self.gerrit_changes_dbs['gerrit'])
|
||||
|
||||
if 'smtp' in self.config.sections():
|
||||
self.connections.connections['smtp'] = \
|
||||
zuul.connection.smtp.SMTPConnection(
|
||||
'_legacy_smtp', dict(self.config.items('smtp')))
|
||||
self.connections.configure(self.config)
|
||||
|
||||
def setup_config(self):
|
||||
# This creates the per-test configuration object. It can be
|
||||
|
|
|
@ -12,22 +12,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import testtools
|
||||
|
||||
import zuul.connection.gerrit
|
||||
|
||||
from tests.base import ZuulTestCase
|
||||
|
||||
|
||||
class TestGerritConnection(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_connection")
|
||||
|
||||
def test_driver_name(self):
|
||||
self.assertEqual('gerrit',
|
||||
zuul.connection.gerrit.GerritConnection.driver_name)
|
||||
|
||||
|
||||
class TestConnections(ZuulTestCase):
|
||||
config_file = 'zuul-connections-same-gerrit.conf'
|
||||
tenant_config_file = 'config/zuul-connections-same-gerrit/main.yaml'
|
||||
|
|
|
@ -21,7 +21,7 @@ except ImportError:
|
|||
import mock
|
||||
|
||||
from tests.base import BaseTestCase
|
||||
from zuul.connection.gerrit import GerritConnection
|
||||
from zuul.driver.gerrit.gerritconnection import GerritConnection
|
||||
|
||||
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures/gerrit')
|
||||
|
||||
|
@ -46,13 +46,13 @@ def read_fixtures(files):
|
|||
|
||||
class TestGerrit(BaseTestCase):
|
||||
|
||||
@mock.patch('zuul.connection.gerrit.GerritConnection._ssh')
|
||||
@mock.patch('zuul.driver.gerrit.gerritconnection.GerritConnection._ssh')
|
||||
def run_query(self, files, expected_patches, _ssh_mock):
|
||||
gerrit_config = {
|
||||
'user': 'gerrit',
|
||||
'server': 'localhost',
|
||||
}
|
||||
gerrit = GerritConnection('review_gerrit', gerrit_config)
|
||||
gerrit = GerritConnection(None, 'review_gerrit', gerrit_config)
|
||||
|
||||
calls, values = read_fixtures(files)
|
||||
_ssh_mock.side_effect = values
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
# Copyright 2014 Rackspace Australia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import testtools
|
||||
|
||||
import zuul.reporter
|
||||
|
||||
|
||||
class TestSMTPReporter(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_reporter")
|
||||
|
||||
def setUp(self):
|
||||
super(TestSMTPReporter, self).setUp()
|
||||
|
||||
def test_reporter_abc(self):
|
||||
# We only need to instantiate a class for this
|
||||
reporter = zuul.reporter.smtp.SMTPReporter({}) # noqa
|
||||
|
||||
def test_reporter_name(self):
|
||||
self.assertEqual('smtp', zuul.reporter.smtp.SMTPReporter.name)
|
||||
|
||||
|
||||
class TestGerritReporter(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_reporter")
|
||||
|
||||
def setUp(self):
|
||||
super(TestGerritReporter, self).setUp()
|
||||
|
||||
def test_reporter_abc(self):
|
||||
# We only need to instantiate a class for this
|
||||
reporter = zuul.reporter.gerrit.GerritReporter(None) # noqa
|
||||
|
||||
def test_reporter_name(self):
|
||||
self.assertEqual('gerrit', zuul.reporter.gerrit.GerritReporter.name)
|
|
@ -29,8 +29,6 @@ import testtools
|
|||
import zuul.change_matcher
|
||||
import zuul.scheduler
|
||||
import zuul.rpcclient
|
||||
import zuul.reporter.gerrit
|
||||
import zuul.reporter.smtp
|
||||
import zuul.model
|
||||
|
||||
from tests.base import (
|
||||
|
|
|
@ -1,51 +0,0 @@
|
|||
# Copyright 2014 Rackspace Australia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import testtools
|
||||
|
||||
import zuul.trigger
|
||||
|
||||
|
||||
class TestGerritTrigger(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_trigger")
|
||||
|
||||
def test_trigger_abc(self):
|
||||
# We only need to instantiate a class for this
|
||||
zuul.trigger.gerrit.GerritTrigger({})
|
||||
|
||||
def test_trigger_name(self):
|
||||
self.assertEqual('gerrit', zuul.trigger.gerrit.GerritTrigger.name)
|
||||
|
||||
|
||||
class TestTimerTrigger(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_trigger")
|
||||
|
||||
def test_trigger_abc(self):
|
||||
# We only need to instantiate a class for this
|
||||
zuul.trigger.timer.TimerTrigger({})
|
||||
|
||||
def test_trigger_name(self):
|
||||
self.assertEqual('timer', zuul.trigger.timer.TimerTrigger.name)
|
||||
|
||||
|
||||
class TestZuulTrigger(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_trigger")
|
||||
|
||||
def test_trigger_abc(self):
|
||||
# We only need to instantiate a class for this
|
||||
zuul.trigger.zuultrigger.ZuulTrigger({})
|
||||
|
||||
def test_trigger_name(self):
|
||||
self.assertEqual('zuul', zuul.trigger.zuultrigger.ZuulTrigger.name)
|
|
@ -308,37 +308,17 @@ class PipelineParser(object):
|
|||
|
||||
@staticmethod
|
||||
def getDriverSchema(dtype, connections):
|
||||
# TODO(jhesketh): Make the driver discovery dynamic
|
||||
connection_drivers = {
|
||||
'trigger': {
|
||||
'gerrit': 'zuul.trigger.gerrit',
|
||||
},
|
||||
'reporter': {
|
||||
'gerrit': 'zuul.reporter.gerrit',
|
||||
'smtp': 'zuul.reporter.smtp',
|
||||
},
|
||||
}
|
||||
standard_drivers = {
|
||||
'trigger': {
|
||||
'timer': 'zuul.trigger.timer',
|
||||
'zuul': 'zuul.trigger.zuultrigger',
|
||||
}
|
||||
methods = {
|
||||
'trigger': 'getTriggerSchema',
|
||||
'reporter': 'getReporterSchema',
|
||||
}
|
||||
|
||||
schema = {}
|
||||
# Add the configured connections as available layout options
|
||||
for connection_name, connection in connections.connections.items():
|
||||
for dname, dmod in connection_drivers.get(dtype, {}).items():
|
||||
if connection.driver_name == dname:
|
||||
schema[connection_name] = to_list(__import__(
|
||||
connection_drivers[dtype][dname],
|
||||
fromlist=['']).getSchema())
|
||||
|
||||
# Standard drivers are always available and don't require a unique
|
||||
# (connection) name
|
||||
for dname, dmod in standard_drivers.get(dtype, {}).items():
|
||||
schema[dname] = to_list(__import__(
|
||||
standard_drivers[dtype][dname], fromlist=['']).getSchema())
|
||||
method = getattr(connection.driver, methods[dtype], None)
|
||||
if method:
|
||||
schema[connection_name] = to_list(method())
|
||||
|
||||
return schema
|
||||
|
||||
|
|
|
@ -34,12 +34,13 @@ class BaseConnection(object):
|
|||
into. For example, a trigger will likely require some kind of query method
|
||||
while a reporter may need a review method."""
|
||||
|
||||
def __init__(self, connection_name, connection_config):
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
# connection_name is the name given to this connection in zuul.ini
|
||||
# connection_config is a dictionary of config_section from zuul.ini for
|
||||
# this connection.
|
||||
# __init__ shouldn't make the actual connection in case this connection
|
||||
# isn't used in the layout.
|
||||
self.driver = driver
|
||||
self.connection_name = connection_name
|
||||
self.connection_config = connection_config
|
||||
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
# Copyright 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class Driver(object):
|
||||
"""A Zuul Driver.
|
||||
|
||||
A Driver is an extension component of Zuul that supports
|
||||
interfacing with a remote system. It can support any of the
|
||||
following interfaces:
|
||||
|
||||
* Connection
|
||||
* Source
|
||||
* Trigger
|
||||
* Reporter
|
||||
|
||||
Drivers supporting each of these interfaces must implement some of
|
||||
the following methods, as appropriate.
|
||||
|
||||
Zuul will create a single instance of each Driver (which will be
|
||||
shared by all tenants), and this instance will persist for the
|
||||
life of the process. The Driver class may therefore manage any
|
||||
global state used by all connections.
|
||||
|
||||
The class or instance attribute **name** must be provided as a string.
|
||||
|
||||
"""
|
||||
|
||||
name = None
|
||||
|
||||
def getConnection(self, name, config):
|
||||
"""Create and return a new Connection object.
|
||||
|
||||
Required if this driver implements the Connection interface.
|
||||
|
||||
This method will be called once for each connection specified
|
||||
in zuul.conf. The resultant object should be responsible for
|
||||
establishing any long-lived connections to remote systems. If
|
||||
Zuul is reconfigured, all existing connections will be stopped
|
||||
and this method will be called again for any new connections
|
||||
which should be created.
|
||||
|
||||
When a connection is specified in zuul.conf with a name, that
|
||||
name is used here when creating the connection, and it is also
|
||||
used in the layout to attach triggers and reporters to the
|
||||
named connection. If the Driver does not utilize a connection
|
||||
(because it does not interact with a remote system), do not
|
||||
implement this method and Zuul will automatically associate
|
||||
triggers and reporters with the name of the Driver itself
|
||||
where it would normally expect the name of a connection.
|
||||
|
||||
:arg str name: The name of the connection. This is the name
|
||||
supplied in the zuul.conf file where the connection is
|
||||
configured.
|
||||
:arg dict config: The configuration information supplied along
|
||||
with the connection in zuul.conf.
|
||||
|
||||
:returns: A new Connection object.
|
||||
:rtype: Connection
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def getTrigger(self, connection, config=None):
|
||||
"""Create and return a new Connection object.
|
||||
|
||||
Required if this driver implements the Trigger interface.
|
||||
|
||||
:arg Connection connection: The Connection object associated
|
||||
with the trigger (as previously returned by getConnection)
|
||||
or None.
|
||||
:arg dict config: The configuration information supplied along
|
||||
with the trigger in the layout.
|
||||
|
||||
:returns: A new Trigger object.
|
||||
:rtype: Trigger
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def getSource(self, connection):
|
||||
"""Create and return a new Source object.
|
||||
|
||||
Required if this driver implements the Source interface.
|
||||
|
||||
:arg Connection connection: The Connection object associated
|
||||
with the source (as previously returned by getConnection).
|
||||
|
||||
:returns: A new Source object.
|
||||
:rtype: Source
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def getReporter(self, connection, config=None):
|
||||
"""Create and return a new Reporter object.
|
||||
|
||||
Required if this driver implements the Reporter interface.
|
||||
|
||||
:arg Connection connection: The Connection object associated
|
||||
with the reporter (as previously returned by getConnection)
|
||||
or None.
|
||||
:arg dict config: The configuration information supplied along
|
||||
with the reporter in the layout.
|
||||
|
||||
:returns: A new Reporter object.
|
||||
:rtype: Reporter
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def getTriggerSchema(self):
|
||||
"""Get the schema for this driver's trigger.
|
||||
|
||||
Required if this driver implements the Trigger interface.
|
||||
|
||||
:returns: A voluptuous schema.
|
||||
:rtype: dict or Schema
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def getReporterSchema(self):
|
||||
"""Get the schema for this driver's reporter.
|
||||
|
||||
Required if this driver implements the Reporter interface.
|
||||
|
||||
:returns: A voluptuous schema.
|
||||
:rtype: dict or Schema
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def reconfigure(self, tenant):
|
||||
"""Called when a tenant is reconfigured.
|
||||
|
||||
When Zuul performs a reconfiguration for a tenant, this method
|
||||
is called with the tenant (including the new layout
|
||||
configuration) as an argument. The driver may establish any
|
||||
global resources needed by the tenant at this point.
|
||||
|
||||
:arg Tenant tenant: The tenant which has been reconfigured.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
def registerScheduler(self, scheduler):
|
||||
"""Register the scheduler with the driver.
|
||||
|
||||
This method is called once during initialization to allow the
|
||||
driver to store a handle to the running scheduler.
|
||||
|
||||
:arg Scheduler scheduler: The current running scheduler.
|
||||
|
||||
"""
|
||||
pass
|
|
@ -0,0 +1,40 @@
|
|||
# Copyright 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gerritconnection
|
||||
import gerrittrigger
|
||||
import gerritsource
|
||||
import gerritreporter
|
||||
|
||||
|
||||
class GerritDriver(object):
|
||||
name = 'gerrit'
|
||||
|
||||
def getConnection(self, name, config):
|
||||
return gerritconnection.GerritConnection(self, name, config)
|
||||
|
||||
def getTrigger(self, connection, config=None):
|
||||
return gerrittrigger.GerritTrigger(self, connection, config)
|
||||
|
||||
def getSource(self, connection):
|
||||
return gerritsource.GerritSource(self, connection)
|
||||
|
||||
def getReporter(self, connection, config=None):
|
||||
return gerritreporter.GerritReporter(self, connection, config)
|
||||
|
||||
def getTriggerSchema(self):
|
||||
return gerrittrigger.getSchema()
|
||||
|
||||
def getReporterSchema(self):
|
||||
return gerritreporter.getSchema()
|
|
@ -228,8 +228,8 @@ class GerritConnection(BaseConnection):
|
|||
replication_timeout = 300
|
||||
replication_retry_interval = 5
|
||||
|
||||
def __init__(self, connection_name, connection_config):
|
||||
super(GerritConnection, self).__init__(connection_name,
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
super(GerritConnection, self).__init__(driver, connection_name,
|
||||
connection_config)
|
||||
if 'server' not in self.connection_config:
|
||||
raise Exception('server is required for gerrit connections in '
|
||||
|
@ -760,12 +760,12 @@ class GerritConnection(BaseConnection):
|
|||
return url
|
||||
|
||||
def onLoad(self):
|
||||
self.log.debug("Starting Gerrit Conncetion/Watchers")
|
||||
self.log.debug("Starting Gerrit Connection/Watchers")
|
||||
self._start_watcher_thread()
|
||||
self._start_event_connector()
|
||||
|
||||
def onStop(self):
|
||||
self.log.debug("Stopping Gerrit Conncetion/Watchers")
|
||||
self.log.debug("Stopping Gerrit Connection/Watchers")
|
||||
self._stop_watcher_thread()
|
||||
self._stop_event_connector()
|
||||
|
|
@ -30,13 +30,13 @@ class GerritReporter(BaseReporter):
|
|||
message = self._formatItemReport(pipeline, item)
|
||||
|
||||
self.log.debug("Report change %s, params %s, message: %s" %
|
||||
(item.change, self.reporter_config, message))
|
||||
(item.change, self.config, message))
|
||||
changeid = '%s,%s' % (item.change.number, item.change.patchset)
|
||||
item.change._ref_sha = source.getRefSha(
|
||||
item.change.project.name, 'refs/heads/' + item.change.branch)
|
||||
|
||||
return self.connection.review(item.change.project.name, changeid,
|
||||
message, self.reporter_config)
|
||||
message, self.config)
|
||||
|
||||
def getSubmitAllowNeeds(self):
|
||||
"""Get a list of code review labels that are allowed to be
|
||||
|
@ -44,7 +44,7 @@ class GerritReporter(BaseReporter):
|
|||
to this queue. In other words, the list of review labels
|
||||
this reporter itself is likely to set before submitting.
|
||||
"""
|
||||
return self.reporter_config
|
||||
return self.config
|
||||
|
||||
|
||||
def getSchema():
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright 2014 Rackspace Australia
|
||||
# Copyright 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -12,14 +12,18 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import testtools
|
||||
|
||||
import zuul.source
|
||||
import smtpconnection
|
||||
import smtpreporter
|
||||
|
||||
|
||||
class TestGerritSource(testtools.TestCase):
|
||||
log = logging.getLogger("zuul.test_source")
|
||||
class SMTPDriver(object):
|
||||
name = 'smtp'
|
||||
|
||||
def test_source_name(self):
|
||||
self.assertEqual('gerrit', zuul.source.gerrit.GerritSource.name)
|
||||
def getConnection(self, name, config):
|
||||
return smtpconnection.SMTPConnection(self, name, config)
|
||||
|
||||
def getReporter(self, connection, config=None):
|
||||
return smtpreporter.SMTPReporter(self, connection, config)
|
||||
|
||||
def getReporterSchema(self):
|
||||
return smtpreporter.getSchema()
|
|
@ -25,9 +25,8 @@ class SMTPConnection(BaseConnection):
|
|||
driver_name = 'smtp'
|
||||
log = logging.getLogger("connection.smtp")
|
||||
|
||||
def __init__(self, connection_name, connection_config):
|
||||
|
||||
super(SMTPConnection, self).__init__(connection_name,
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
super(SMTPConnection, self).__init__(driver, connection_name,
|
||||
connection_config)
|
||||
|
||||
self.smtp_server = self.connection_config.get(
|
|
@ -29,15 +29,15 @@ class SMTPReporter(BaseReporter):
|
|||
message = self._formatItemReport(pipeline, item)
|
||||
|
||||
self.log.debug("Report change %s, params %s, message: %s" %
|
||||
(item.change, self.reporter_config, message))
|
||||
(item.change, self.config, message))
|
||||
|
||||
from_email = self.reporter_config['from'] \
|
||||
if 'from' in self.reporter_config else None
|
||||
to_email = self.reporter_config['to'] \
|
||||
if 'to' in self.reporter_config else None
|
||||
from_email = self.config['from'] \
|
||||
if 'from' in self.config else None
|
||||
to_email = self.config['to'] \
|
||||
if 'to' in self.config else None
|
||||
|
||||
if 'subject' in self.reporter_config:
|
||||
subject = self.reporter_config['subject'].format(
|
||||
if 'subject' in self.config:
|
||||
subject = self.config['subject'].format(
|
||||
change=item.change)
|
||||
else:
|
||||
subject = "Report for change %s" % item.change
|
|
@ -0,0 +1,94 @@
|
|||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
from zuul.model import TriggerEvent
|
||||
import timertrigger
|
||||
|
||||
|
||||
class TimerDriver(object):
|
||||
name = 'timer'
|
||||
|
||||
log = logging.getLogger("zuul.Timer")
|
||||
|
||||
def __init__(self):
|
||||
self.apsched = BackgroundScheduler()
|
||||
self.apsched.start()
|
||||
self.tenant_jobs = {}
|
||||
|
||||
def registerScheduler(self, scheduler):
|
||||
self.sched = scheduler
|
||||
|
||||
def reconfigure(self, tenant):
|
||||
self._removeJobs(tenant)
|
||||
self._addJobs(tenant)
|
||||
|
||||
def _removeJobs(self, tenant):
|
||||
jobs = self.tenant_jobs.get(tenant.name, [])
|
||||
for job in jobs:
|
||||
job.remove()
|
||||
|
||||
def _addJobs(self, tenant):
|
||||
jobs = []
|
||||
self.tenant_jobs[tenant.name] = jobs
|
||||
for pipeline in tenant.layout.pipelines:
|
||||
for ef in pipeline.manager.event_filters:
|
||||
if not isinstance(ef.trigger, timertrigger.TimerTrigger):
|
||||
continue
|
||||
for timespec in ef.timespecs:
|
||||
parts = timespec.split()
|
||||
if len(parts) < 5 or len(parts) > 6:
|
||||
self.log.error(
|
||||
"Unable to parse time value '%s' "
|
||||
"defined in pipeline %s" % (
|
||||
timespec,
|
||||
pipeline.name))
|
||||
continue
|
||||
minute, hour, dom, month, dow = parts[:5]
|
||||
if len(parts) > 5:
|
||||
second = parts[5]
|
||||
else:
|
||||
second = None
|
||||
trigger = CronTrigger(day=dom, day_of_week=dow, hour=hour,
|
||||
minute=minute, second=second)
|
||||
|
||||
job = self.apsched.add_job(
|
||||
self._onTrigger, trigger=trigger,
|
||||
args=(tenant, pipeline.name, timespec,))
|
||||
jobs.append(job)
|
||||
|
||||
def _onTrigger(self, tenant, pipeline_name, timespec):
|
||||
for project in tenant.layout.projects.values():
|
||||
event = TriggerEvent()
|
||||
event.type = 'timer'
|
||||
event.timespec = timespec
|
||||
event.forced_pipeline = pipeline_name
|
||||
event.project_name = project.name
|
||||
self.log.debug("Adding event %s" % event)
|
||||
self.sched.addEvent(event)
|
||||
|
||||
def stop(self):
|
||||
self.apsched.shutdown()
|
||||
|
||||
def getTrigger(self, connection_name):
|
||||
return timertrigger.TimerTrigger(self)
|
||||
|
||||
def getTriggerSchema(self):
|
||||
return timertrigger.getSchema()
|
|
@ -0,0 +1,46 @@
|
|||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import voluptuous as v
|
||||
|
||||
from zuul.model import EventFilter
|
||||
from zuul.trigger import BaseTrigger
|
||||
|
||||
|
||||
class TimerTrigger(BaseTrigger):
|
||||
name = 'timer'
|
||||
|
||||
def getEventFilters(self, trigger_conf):
|
||||
def toList(item):
|
||||
if not item:
|
||||
return []
|
||||
if isinstance(item, list):
|
||||
return item
|
||||
return [item]
|
||||
|
||||
efilters = []
|
||||
for trigger in toList(trigger_conf):
|
||||
f = EventFilter(trigger=self,
|
||||
types=['timer'],
|
||||
timespecs=toList(trigger['time']))
|
||||
|
||||
efilters.append(f)
|
||||
|
||||
return efilters
|
||||
|
||||
|
||||
def getSchema():
|
||||
timer_trigger = {v.Required('time'): str}
|
||||
return timer_trigger
|
|
@ -0,0 +1,111 @@
|
|||
# Copyright 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from zuul.model import TriggerEvent
|
||||
|
||||
import zuultrigger
|
||||
|
||||
PARENT_CHANGE_ENQUEUED = 'parent-change-enqueued'
|
||||
PROJECT_CHANGE_MERGED = 'project-change-merged'
|
||||
|
||||
|
||||
class ZuulDriver(object):
|
||||
name = 'zuul'
|
||||
log = logging.getLogger("zuul.ZuulTrigger")
|
||||
|
||||
def __init__(self):
|
||||
self.tenant_events = {}
|
||||
|
||||
def registerScheduler(self, scheduler):
|
||||
self.sched = scheduler
|
||||
|
||||
def reconfigure(self, tenant):
|
||||
events = set()
|
||||
self.tenant_events[tenant.name] = events
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
for ef in pipeline.manager.event_filters:
|
||||
if not isinstance(ef.trigger, zuultrigger.ZuulTrigger):
|
||||
continue
|
||||
if PARENT_CHANGE_ENQUEUED in ef._types:
|
||||
events.add(PARENT_CHANGE_ENQUEUED)
|
||||
elif PROJECT_CHANGE_MERGED in ef._types:
|
||||
events.add(PROJECT_CHANGE_MERGED)
|
||||
|
||||
def onChangeMerged(self, tenant, change, source):
|
||||
# Called each time zuul merges a change
|
||||
if PROJECT_CHANGE_MERGED in self.tenant_events[tenant.name]:
|
||||
try:
|
||||
self._createProjectChangeMergedEvents(change, source)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Unable to create project-change-merged events for "
|
||||
"%s" % (change,))
|
||||
|
||||
def onChangeEnqueued(self, tenant, change, pipeline):
|
||||
self.log.debug("onChangeEnqueued %s", self.tenant_events[tenant.name])
|
||||
# Called each time a change is enqueued in a pipeline
|
||||
if PARENT_CHANGE_ENQUEUED in self.tenant_events[tenant.name]:
|
||||
try:
|
||||
self._createParentChangeEnqueuedEvents(change, pipeline)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Unable to create parent-change-enqueued events for "
|
||||
"%s in %s" % (change, pipeline))
|
||||
|
||||
def _createProjectChangeMergedEvents(self, change, source):
|
||||
changes = source.getProjectOpenChanges(
|
||||
change.project)
|
||||
for open_change in changes:
|
||||
self._createProjectChangeMergedEvent(open_change)
|
||||
|
||||
def _createProjectChangeMergedEvent(self, change):
|
||||
event = TriggerEvent()
|
||||
event.type = PROJECT_CHANGE_MERGED
|
||||
event.trigger_name = self.name
|
||||
event.project_name = change.project.name
|
||||
event.change_number = change.number
|
||||
event.branch = change.branch
|
||||
event.change_url = change.url
|
||||
event.patch_number = change.patchset
|
||||
event.refspec = change.refspec
|
||||
self.sched.addEvent(event)
|
||||
|
||||
def _createParentChangeEnqueuedEvents(self, change, pipeline):
|
||||
self.log.debug("Checking for changes needing %s:" % change)
|
||||
if not hasattr(change, 'needed_by_changes'):
|
||||
self.log.debug(" Changeish does not support dependencies")
|
||||
return
|
||||
for needs in change.needed_by_changes:
|
||||
self._createParentChangeEnqueuedEvent(needs, pipeline)
|
||||
|
||||
def _createParentChangeEnqueuedEvent(self, change, pipeline):
|
||||
event = TriggerEvent()
|
||||
event.type = PARENT_CHANGE_ENQUEUED
|
||||
event.trigger_name = self.name
|
||||
event.pipeline_name = pipeline.name
|
||||
event.project_name = change.project.name
|
||||
event.change_number = change.number
|
||||
event.branch = change.branch
|
||||
event.change_url = change.url
|
||||
event.patch_number = change.patchset
|
||||
event.refspec = change.refspec
|
||||
self.sched.addEvent(event)
|
||||
|
||||
def getTrigger(self, connection_name, config=None):
|
||||
return zuultrigger.ZuulTrigger(self, config)
|
||||
|
||||
def getTriggerSchema(self):
|
||||
return zuultrigger.getSchema()
|
|
@ -0,0 +1,77 @@
|
|||
# Copyright 2012-2014 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import voluptuous as v
|
||||
from zuul.model import EventFilter
|
||||
from zuul.trigger import BaseTrigger
|
||||
|
||||
|
||||
class ZuulTrigger(BaseTrigger):
|
||||
name = 'zuul'
|
||||
log = logging.getLogger("zuul.ZuulTrigger")
|
||||
|
||||
def __init__(self, connection, config=None):
|
||||
super(ZuulTrigger, self).__init__(connection, config)
|
||||
self._handle_parent_change_enqueued_events = False
|
||||
self._handle_project_change_merged_events = False
|
||||
|
||||
def getEventFilters(self, trigger_conf):
|
||||
def toList(item):
|
||||
if not item:
|
||||
return []
|
||||
if isinstance(item, list):
|
||||
return item
|
||||
return [item]
|
||||
|
||||
efilters = []
|
||||
for trigger in toList(trigger_conf):
|
||||
f = EventFilter(
|
||||
trigger=self,
|
||||
types=toList(trigger['event']),
|
||||
pipelines=toList(trigger.get('pipeline')),
|
||||
required_approvals=(
|
||||
toList(trigger.get('require-approval'))
|
||||
),
|
||||
reject_approvals=toList(
|
||||
trigger.get('reject-approval')
|
||||
),
|
||||
)
|
||||
efilters.append(f)
|
||||
|
||||
return efilters
|
||||
|
||||
|
||||
def getSchema():
|
||||
def toList(x):
|
||||
return v.Any([x], x)
|
||||
|
||||
approval = v.Schema({'username': str,
|
||||
'email-filter': str,
|
||||
'email': str,
|
||||
'older-than': str,
|
||||
'newer-than': str,
|
||||
}, extra=True)
|
||||
|
||||
zuul_trigger = {
|
||||
v.Required('event'):
|
||||
toList(v.Any('parent-change-enqueued',
|
||||
'project-change-merged')),
|
||||
'pipeline': toList(str),
|
||||
'require-approval': toList(approval),
|
||||
'reject-approval': toList(approval),
|
||||
}
|
||||
|
||||
return zuul_trigger
|
|
@ -14,8 +14,14 @@
|
|||
|
||||
import re
|
||||
|
||||
import zuul.connection.gerrit
|
||||
import zuul.connection.smtp
|
||||
import zuul.driver.zuul
|
||||
import zuul.driver.gerrit
|
||||
import zuul.driver.smtp
|
||||
from zuul.connection import BaseConnection
|
||||
|
||||
|
||||
class DefaultConnection(BaseConnection):
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionRegistry(object):
|
||||
|
@ -23,13 +29,31 @@ class ConnectionRegistry(object):
|
|||
|
||||
def __init__(self):
|
||||
self.connections = {}
|
||||
self.drivers = {}
|
||||
|
||||
self.registerDriver(zuul.driver.zuul.ZuulDriver())
|
||||
self.registerDriver(zuul.driver.gerrit.GerritDriver())
|
||||
self.registerDriver(zuul.driver.smtp.SMTPDriver())
|
||||
|
||||
def registerDriver(self, driver):
|
||||
if driver.name in self.drivers:
|
||||
raise Exception("Driver %s already registered" % driver.name)
|
||||
self.drivers[driver.name] = driver
|
||||
|
||||
def registerScheduler(self, sched, load=True):
|
||||
for driver_name, driver in self.drivers.items():
|
||||
if hasattr(driver, 'registerScheduler'):
|
||||
driver.registerScheduler(sched)
|
||||
for connection_name, connection in self.connections.items():
|
||||
connection.registerScheduler(sched)
|
||||
if load:
|
||||
connection.onLoad()
|
||||
|
||||
def reconfigureDrivers(self, tenant):
|
||||
for driver in self.drivers.values():
|
||||
if hasattr(driver, 'reconfigure'):
|
||||
driver.reconfigure(tenant)
|
||||
|
||||
def stop(self):
|
||||
for connection_name, connection in self.connections.items():
|
||||
connection.onStop()
|
||||
|
@ -52,79 +76,46 @@ class ConnectionRegistry(object):
|
|||
% con_name)
|
||||
|
||||
con_driver = con_config['driver']
|
||||
|
||||
# TODO(jhesketh): load the required class automatically
|
||||
if con_driver == 'gerrit':
|
||||
connections[con_name] = \
|
||||
zuul.connection.gerrit.GerritConnection(con_name,
|
||||
con_config)
|
||||
elif con_driver == 'smtp':
|
||||
connections[con_name] = \
|
||||
zuul.connection.smtp.SMTPConnection(con_name, con_config)
|
||||
else:
|
||||
if con_driver not in self.drivers:
|
||||
raise Exception("Unknown driver, %s, for connection %s"
|
||||
% (con_config['driver'], con_name))
|
||||
|
||||
driver = self.drivers[con_driver]
|
||||
connection = driver.getConnection(con_name, con_config)
|
||||
connections[con_name] = connection
|
||||
|
||||
# If the [gerrit] or [smtp] sections still exist, load them in as a
|
||||
# connection named 'gerrit' or 'smtp' respectfully
|
||||
|
||||
if 'gerrit' in config.sections():
|
||||
driver = self.drivers['gerrit']
|
||||
connections['gerrit'] = \
|
||||
zuul.connection.gerrit.GerritConnection(
|
||||
driver.getConnection(
|
||||
'gerrit', dict(config.items('gerrit')))
|
||||
|
||||
if 'smtp' in config.sections():
|
||||
driver = self.drivers['smtp']
|
||||
connections['smtp'] = \
|
||||
zuul.connection.smtp.SMTPConnection(
|
||||
driver.getConnection(
|
||||
'smtp', dict(config.items('smtp')))
|
||||
|
||||
# Create default connections for drivers which need no
|
||||
# connection information (e.g., 'timer' or 'zuul').
|
||||
for driver in self.drivers.values():
|
||||
if not hasattr(driver, 'getConnection'):
|
||||
connections[driver.name] = DefaultConnection(
|
||||
driver, driver.name, {})
|
||||
|
||||
self.connections = connections
|
||||
|
||||
def _getDriver(self, dtype, connection_name, driver_config={}):
|
||||
# Instantiate a driver such as a trigger, source or reporter
|
||||
# TODO(jhesketh): Make this list dynamic or use entrypoints etc.
|
||||
# Stevedore was not a good fit here due to the nature of triggers.
|
||||
# Specifically we don't want to load a trigger per a pipeline as one
|
||||
# trigger can listen to a stream (from gerrit, for example) and the
|
||||
# scheduler decides which eventfilter to use. As such we want to load
|
||||
# trigger+connection pairs uniquely.
|
||||
drivers = {
|
||||
'source': {
|
||||
'gerrit': 'zuul.source.gerrit:GerritSource',
|
||||
},
|
||||
'trigger': {
|
||||
'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
|
||||
'timer': 'zuul.trigger.timer:TimerTrigger',
|
||||
'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
|
||||
},
|
||||
'reporter': {
|
||||
'gerrit': 'zuul.reporter.gerrit:GerritReporter',
|
||||
'smtp': 'zuul.reporter.smtp:SMTPReporter',
|
||||
},
|
||||
}
|
||||
|
||||
# TODO(jhesketh): Check the connection_name exists
|
||||
if connection_name in self.connections.keys():
|
||||
driver_name = self.connections[connection_name].driver_name
|
||||
connection = self.connections[connection_name]
|
||||
else:
|
||||
# In some cases a driver may not be related to a connection. For
|
||||
# example, the 'timer' or 'zuul' triggers.
|
||||
driver_name = connection_name
|
||||
connection = None
|
||||
driver = drivers[dtype][driver_name].split(':')
|
||||
driver_instance = getattr(
|
||||
__import__(driver[0], fromlist=['']), driver[1])(
|
||||
driver_config, connection
|
||||
)
|
||||
|
||||
return driver_instance
|
||||
|
||||
def getSource(self, connection_name):
|
||||
return self._getDriver('source', connection_name)
|
||||
connection = self.connections[connection_name]
|
||||
return connection.driver.getSource(connection)
|
||||
|
||||
def getReporter(self, connection_name, driver_config={}):
|
||||
return self._getDriver('reporter', connection_name, driver_config)
|
||||
def getReporter(self, connection_name, config=None):
|
||||
connection = self.connections[connection_name]
|
||||
return connection.driver.getReporter(connection, config)
|
||||
|
||||
def getTrigger(self, connection_name, driver_config={}):
|
||||
return self._getDriver('trigger', connection_name, driver_config)
|
||||
def getTrigger(self, connection_name, config=None):
|
||||
connection = self.connections[connection_name]
|
||||
return connection.driver.getTrigger(connection, config)
|
||||
|
|
|
@ -328,8 +328,9 @@ class PipelineManager(object):
|
|||
self.reportStart(item)
|
||||
self.enqueueChangesBehind(change, quiet, ignore_requirements,
|
||||
change_queue)
|
||||
for trigger in self.sched.triggers.values():
|
||||
trigger.onChangeEnqueued(item.change, self.pipeline)
|
||||
zuul_driver = self.sched.connections.drivers['zuul']
|
||||
tenant = self.pipeline.layout.tenant
|
||||
zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline)
|
||||
return True
|
||||
|
||||
def dequeueItem(self, item):
|
||||
|
@ -683,8 +684,10 @@ class PipelineManager(object):
|
|||
self.log.debug("%s window size increased to %s" %
|
||||
(change_queue, change_queue.window))
|
||||
|
||||
for trigger in self.sched.triggers.values():
|
||||
trigger.onChangeMerged(item.change, self.pipeline.source)
|
||||
zuul_driver = self.sched.connections.drivers['zuul']
|
||||
tenant = self.pipeline.layout.tenant
|
||||
zuul_driver.onChangeMerged(tenant, item.change,
|
||||
self.pipeline.source)
|
||||
|
||||
def _reportItem(self, item):
|
||||
self.log.debug("Reporting change %s" % item.change)
|
||||
|
|
|
@ -27,9 +27,10 @@ class BaseReporter(object):
|
|||
|
||||
log = logging.getLogger("zuul.reporter.BaseReporter")
|
||||
|
||||
def __init__(self, reporter_config={}, connection=None):
|
||||
self.reporter_config = reporter_config
|
||||
def __init__(self, driver, connection, config=None):
|
||||
self.driver = driver
|
||||
self.connection = connection
|
||||
self.config = config or {}
|
||||
self._action = None
|
||||
|
||||
def setAction(self, action):
|
||||
|
|
|
@ -534,6 +534,8 @@ class Scheduler(threading.Thread):
|
|||
self._reenqueueTenant(old_tenant, tenant)
|
||||
# TODOv3(jeblair): update for tenants
|
||||
# self.maintainConnectionCache()
|
||||
self.connections.reconfigureDrivers(tenant)
|
||||
# TODOv3(jeblair): remove postconfig calls?
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
pipeline.source.postConfig()
|
||||
for trigger in pipeline.triggers:
|
||||
|
|
|
@ -23,9 +23,10 @@ class BaseTrigger(object):
|
|||
|
||||
Defines the exact public methods that must be supplied."""
|
||||
|
||||
def __init__(self, trigger_config={}, connection=None):
|
||||
self.trigger_config = trigger_config
|
||||
def __init__(self, driver, connection, config=None):
|
||||
self.driver = driver
|
||||
self.connection = connection
|
||||
self.config = config or {}
|
||||
|
||||
@abc.abstractmethod
|
||||
def getEventFilters(self, trigger_conf):
|
||||
|
|
|
@ -1,93 +0,0 @@
|
|||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
import logging
|
||||
import voluptuous as v
|
||||
from zuul.model import EventFilter, TriggerEvent
|
||||
from zuul.trigger import BaseTrigger
|
||||
|
||||
|
||||
class TimerTrigger(BaseTrigger):
|
||||
name = 'timer'
|
||||
log = logging.getLogger("zuul.Timer")
|
||||
|
||||
def __init__(self, trigger_config={}, connection=None):
|
||||
super(TimerTrigger, self).__init__(trigger_config, connection)
|
||||
self.apsched = BackgroundScheduler()
|
||||
self.apsched.start()
|
||||
|
||||
def _onTrigger(self, pipeline_name, timespec):
|
||||
for project in self.sched.layout.projects.values():
|
||||
event = TriggerEvent()
|
||||
event.type = 'timer'
|
||||
event.timespec = timespec
|
||||
event.forced_pipeline = pipeline_name
|
||||
event.project_name = project.name
|
||||
self.log.debug("Adding event %s" % event)
|
||||
self.connection.sched.addEvent(event)
|
||||
|
||||
def stop(self):
|
||||
self.apsched.shutdown()
|
||||
|
||||
def getEventFilters(self, trigger_conf):
|
||||
def toList(item):
|
||||
if not item:
|
||||
return []
|
||||
if isinstance(item, list):
|
||||
return item
|
||||
return [item]
|
||||
|
||||
efilters = []
|
||||
for trigger in toList(trigger_conf):
|
||||
f = EventFilter(trigger=self,
|
||||
types=['timer'],
|
||||
timespecs=toList(trigger['time']))
|
||||
|
||||
efilters.append(f)
|
||||
|
||||
return efilters
|
||||
|
||||
def postConfig(self, pipeline):
|
||||
for job in self.apsched.get_jobs():
|
||||
job.remove()
|
||||
for ef in pipeline.manager.event_filters:
|
||||
if ef.trigger != self:
|
||||
continue
|
||||
for timespec in ef.timespecs:
|
||||
parts = timespec.split()
|
||||
if len(parts) < 5 or len(parts) > 6:
|
||||
self.log.error(
|
||||
"Unable to parse time value '%s' "
|
||||
"defined in pipeline %s" % (
|
||||
timespec,
|
||||
pipeline.name))
|
||||
continue
|
||||
minute, hour, dom, month, dow = parts[:5]
|
||||
if len(parts) > 5:
|
||||
second = parts[5]
|
||||
else:
|
||||
second = None
|
||||
trigger = CronTrigger(day=dom, day_of_week=dow, hour=hour,
|
||||
minute=minute, second=second)
|
||||
|
||||
self.apsched.add_job(self._onTrigger, trigger=trigger,
|
||||
args=(pipeline.name, timespec,))
|
||||
|
||||
|
||||
def getSchema():
|
||||
timer_trigger = {v.Required('time'): str}
|
||||
return timer_trigger
|
|
@ -1,147 +0,0 @@
|
|||
# Copyright 2012-2014 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import voluptuous as v
|
||||
from zuul.model import EventFilter, TriggerEvent
|
||||
from zuul.trigger import BaseTrigger
|
||||
|
||||
|
||||
class ZuulTrigger(BaseTrigger):
|
||||
name = 'zuul'
|
||||
log = logging.getLogger("zuul.ZuulTrigger")
|
||||
|
||||
def __init__(self, trigger_config={}, connection=None):
|
||||
super(ZuulTrigger, self).__init__(trigger_config, connection)
|
||||
self._handle_parent_change_enqueued_events = False
|
||||
self._handle_project_change_merged_events = False
|
||||
|
||||
def getEventFilters(self, trigger_conf):
|
||||
def toList(item):
|
||||
if not item:
|
||||
return []
|
||||
if isinstance(item, list):
|
||||
return item
|
||||
return [item]
|
||||
|
||||
efilters = []
|
||||
for trigger in toList(trigger_conf):
|
||||
f = EventFilter(
|
||||
trigger=self,
|
||||
types=toList(trigger['event']),
|
||||
pipelines=toList(trigger.get('pipeline')),
|
||||
required_approvals=(
|
||||
toList(trigger.get('require-approval'))
|
||||
),
|
||||
reject_approvals=toList(
|
||||
trigger.get('reject-approval')
|
||||
),
|
||||
)
|
||||
efilters.append(f)
|
||||
|
||||
return efilters
|
||||
|
||||
def onChangeMerged(self, change, source):
|
||||
# Called each time zuul merges a change
|
||||
if self._handle_project_change_merged_events:
|
||||
try:
|
||||
self._createProjectChangeMergedEvents(change, source)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Unable to create project-change-merged events for "
|
||||
"%s" % (change,))
|
||||
|
||||
def onChangeEnqueued(self, change, pipeline):
|
||||
# Called each time a change is enqueued in a pipeline
|
||||
if self._handle_parent_change_enqueued_events:
|
||||
try:
|
||||
self._createParentChangeEnqueuedEvents(change, pipeline)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Unable to create parent-change-enqueued events for "
|
||||
"%s in %s" % (change, pipeline))
|
||||
|
||||
def _createProjectChangeMergedEvents(self, change, source):
|
||||
changes = source.getProjectOpenChanges(
|
||||
change.project)
|
||||
for open_change in changes:
|
||||
self._createProjectChangeMergedEvent(open_change)
|
||||
|
||||
def _createProjectChangeMergedEvent(self, change):
|
||||
event = TriggerEvent()
|
||||
event.type = 'project-change-merged'
|
||||
event.trigger_name = self.name
|
||||
event.project_name = change.project.name
|
||||
event.change_number = change.number
|
||||
event.branch = change.branch
|
||||
event.change_url = change.url
|
||||
event.patch_number = change.patchset
|
||||
event.refspec = change.refspec
|
||||
self.connection.sched.addEvent(event)
|
||||
|
||||
def _createParentChangeEnqueuedEvents(self, change, pipeline):
|
||||
self.log.debug("Checking for changes needing %s:" % change)
|
||||
if not hasattr(change, 'needed_by_changes'):
|
||||
self.log.debug(" Changeish does not support dependencies")
|
||||
return
|
||||
for needs in change.needed_by_changes:
|
||||
self._createParentChangeEnqueuedEvent(needs, pipeline)
|
||||
|
||||
def _createParentChangeEnqueuedEvent(self, change, pipeline):
|
||||
event = TriggerEvent()
|
||||
event.type = 'parent-change-enqueued'
|
||||
event.trigger_name = self.name
|
||||
event.pipeline_name = pipeline.name
|
||||
event.project_name = change.project.name
|
||||
event.change_number = change.number
|
||||
event.branch = change.branch
|
||||
event.change_url = change.url
|
||||
event.patch_number = change.patchset
|
||||
event.refspec = change.refspec
|
||||
self.connection.sched.addEvent(event)
|
||||
|
||||
def postConfig(self, pipeline):
|
||||
self._handle_parent_change_enqueued_events = False
|
||||
self._handle_project_change_merged_events = False
|
||||
for ef in pipeline.manager.event_filters:
|
||||
if ef.trigger != self:
|
||||
continue
|
||||
if 'parent-change-enqueued' in ef._types:
|
||||
self._handle_parent_change_enqueued_events = True
|
||||
elif 'project-change-merged' in ef._types:
|
||||
self._handle_project_change_merged_events = True
|
||||
|
||||
|
||||
def getSchema():
|
||||
def toList(x):
|
||||
return v.Any([x], x)
|
||||
|
||||
approval = v.Schema({'username': str,
|
||||
'email-filter': str,
|
||||
'email': str,
|
||||
'older-than': str,
|
||||
'newer-than': str,
|
||||
}, extra=True)
|
||||
|
||||
zuul_trigger = {
|
||||
v.Required('event'):
|
||||
toList(v.Any('parent-change-enqueued',
|
||||
'project-change-merged')),
|
||||
'pipeline': toList(str),
|
||||
'require-approval': toList(approval),
|
||||
'reject-approval': toList(approval),
|
||||
}
|
||||
|
||||
return zuul_trigger
|
Loading…
Reference in New Issue